You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/04/29 02:14:08 UTC

svn commit: r1590846 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/sync/ core/src/test/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Tue Apr 29 00:14:07 2014
New Revision: 1590846

URL: http://svn.apache.org/r1590846
Log:
HAMA-899: Add getAdjacentPeerNames() that returns the names of locally adjacent peers

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Apr 29 00:14:07 2014
@@ -3,6 +3,7 @@ Hama Change Log
 Release 0.7.0 (unreleased changes)
 
   NEW FEATURES
+
    HAMA-863: Implement SparseVector (Yexi Jiang)
 
   BUG FIXES
@@ -13,6 +14,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
   
+   HAMA-899: Add getAdjacentPeerNames() that returns the names of locally adjacent peers (edwardyoon)
    HAMA-847: Vertex should provide Counters (edwardyoon)
    HAMA-568: Add faster synchronized collections for message queues (edwardyoon)
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 29 00:14:07 2014
@@ -78,22 +78,28 @@ public interface BSPPeer<K1, V1, K2, V2,
   public String getPeerName();
 
   /**
-   * @return the name of n-th peer from sorted array by name.
+   * @return the name of n-th peer from sorted array by taskId.
    */
   public String getPeerName(int index);
 
   /**
-   * @return the index of this peer from sorted array by name.
+   * @return the index of this peer from sorted array by taskId.
    */
   public int getPeerIndex();
 
   /**
-   * @return the names of all the peers executing tasks from the same job
-   *         (including this peer).
+   * @return the names of all the peers executing tasks from the same job in
+   *         ascending taskId order (including this peer).
    */
   public String[] getAllPeerNames();
 
   /**
+   * @return the names of locally adjacent peers in ascending taskId order.
+   *         (including this peer).
+   */
+  public String[] getAdjacentPeerNames();
+
+  /**
    * @return the number of peers
    */
   public int getNumPeers();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr 29 00:14:07 2014
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -164,7 +166,7 @@ public final class BSPPeerImpl<K1, V1, K
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
-    
+
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
 
     // This function call may change the current peer address
@@ -374,7 +376,7 @@ public final class BSPPeerImpl<K1, V1, K
       final InetSocketAddress addr = entry.getKey();
 
       final BSPMessageBundle<M> bundle = entry.getValue();
-      
+
       // remove this message during runtime to save a bit of memory
       it.remove();
       try {
@@ -523,6 +525,19 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   @Override
+  public final String[] getAdjacentPeerNames() {
+    initPeerNames();
+    List<String> localPeers = new ArrayList<String>();
+    for (String peerName : allPeers) {
+      if (peerName.startsWith(peerAddress.getHostName() + ":")) {
+        localPeers.add(peerName);
+      }
+    }
+
+    return localPeers.toArray(new String[localPeers.size()]);
+  }
+
+  @Override
   public final String getPeerName(int index) {
     initPeerNames();
     return allPeers[index];
@@ -541,7 +556,7 @@ public final class BSPPeerImpl<K1, V1, K
 
   private final void initPeerNames() {
     if (allPeers == null) {
-      allPeers = syncClient.getAllPeerNames(taskId);
+      allPeers = syncClient.getAllPeerNames(taskId.getJobID());
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Apr 29 00:14:07 2014
@@ -469,11 +469,6 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public String[] getAllPeerNames(TaskAttemptID taskId) {
-      return peerNames;
-    }
-
-    @Override
     public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
         String hostAddress, long port) {
 
@@ -531,6 +526,11 @@ public class LocalBSPRunner implements J
     public boolean remove(String key, SyncEventListener listener) {
       return false;
     }
+
+    @Override
+    public String[] getAllPeerNames(BSPJobID jobID) {
+      return peerNames;
+    }
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java Tue Apr 29 00:14:07 2014
@@ -73,11 +73,11 @@ public abstract class BSPPeerSyncClient 
    * Returns all registered tasks within the sync daemon. They have to be
    * ordered ascending by their task id.
    * 
-   * @param taskId the tasks ID
+   * @param jobID the job ID
    * @return an <b>ordered</b> string array of host:port pairs of all tasks
    *         connected to the daemon.
    */
-  public abstract String[] getAllPeerNames(TaskAttemptID taskId);
+  public abstract String[] getAllPeerNames(BSPJobID jobID);
 
   /**
    * TODO this has currently no use. Could later be used to deregister tasks

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java Tue Apr 29 00:14:07 2014
@@ -76,11 +76,11 @@ public interface PeerSyncClient extends 
    * Returns all registered tasks within the sync daemon. They have to be
    * ordered ascending by their task id.
    * 
-   * @param taskId the tasks ID
+   * @param jobID the job ID
    * @return an <b>ordered</b> string array of host:port pairs of all tasks
    *         connected to the daemon.
    */
-  public String[] getAllPeerNames(TaskAttemptID taskId);
+  public String[] getAllPeerNames(BSPJobID jobID);
 
   /**
    * TODO this has currently no use. Could later be used to deregister tasks

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Tue Apr 29 00:14:07 2014
@@ -273,18 +273,16 @@ public class ZooKeeperSyncClientImpl ext
   }
 
   @Override
-  public String[] getAllPeerNames(TaskAttemptID taskId) {
+  public String[] getAllPeerNames(BSPJobID jobID) {
     if (allPeers == null) {
       TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
       try {
-        List<String> var = zk.getChildren(
-            constructKey(taskId.getJobID(), "peers"), this);
+        List<String> var = zk.getChildren(constructKey(jobID, "peers"), this);
         allPeers = var.toArray(new String[var.size()]);
 
         TreeMap<TaskAttemptID, String> taskAttemptSortedMap = new TreeMap<TaskAttemptID, String>();
         for (String s : allPeers) {
-          byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),
-              this, null);
+          byte[] data = zk.getData(constructKey(jobID, "peers", s), this, null);
           TaskAttemptID thatTask = new TaskAttemptID();
           boolean result = getValueFromBytes(data, thatTask);
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue Apr 29 00:14:07 2014
@@ -301,6 +301,12 @@ public class TestCheckpoint extends Test
       return null;
     }
 
+    @Override
+    public String[] getAdjacentPeerNames() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
   }
 
   public static class TempSyncClient extends BSPPeerSyncClient {
@@ -423,7 +429,7 @@ public class TestCheckpoint extends Test
     }
 
     @Override
-    public String[] getAllPeerNames(TaskAttemptID taskId) {
+    public String[] getAllPeerNames(BSPJobID jobID) {
       return null;
     }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Tue Apr 29 00:14:07 2014
@@ -119,7 +119,7 @@ public class TestZooKeeper extends TestC
           peerClient.constructKey(jobID, "info", "level2"), new IntWritable(5),
           true, null);
 
-      String[] names = peerClient.getAllPeerNames(task1);
+      String[] names = peerClient.getAllPeerNames(task1.getJobID());
 
       Log.info("Found child count = " + names.length);