You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/04/29 02:14:08 UTC
svn commit: r1590846 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/sync/
core/src/test/java/org/apache/hama/bsp/
Author: edwardyoon
Date: Tue Apr 29 00:14:07 2014
New Revision: 1590846
URL: http://svn.apache.org/r1590846
Log:
HAMA-899: Add getAdjacentPeerNames() that returns the names of locally adjacent peers
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Apr 29 00:14:07 2014
@@ -3,6 +3,7 @@ Hama Change Log
Release 0.7.0 (unreleased changes)
NEW FEATURES
+
HAMA-863: Implement SparseVector (Yexi Jiang)
BUG FIXES
@@ -13,6 +14,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-899: Add getAdjacentPeerNames() that returns the names of locally adjacent peers (edwardyoon)
HAMA-847: Vertex should provide Counters (edwardyoon)
HAMA-568: Add faster synchronized collections for message queues (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 29 00:14:07 2014
@@ -78,22 +78,28 @@ public interface BSPPeer<K1, V1, K2, V2,
public String getPeerName();
/**
- * @return the name of n-th peer from sorted array by name.
+ * @return the name of n-th peer from sorted array by taskId.
*/
public String getPeerName(int index);
/**
- * @return the index of this peer from sorted array by name.
+ * @return the index of this peer from sorted array by taskId.
*/
public int getPeerIndex();
/**
- * @return the names of all the peers executing tasks from the same job
- * (including this peer).
+ * @return the names of all the peers executing tasks from the same job in
+ * ascending taskId order (including this peer).
*/
public String[] getAllPeerNames();
/**
+ * @return the names of locally adjacent peers in ascending taskId order.
+ * (including this peer).
+ */
+ public String[] getAdjacentPeerNames();
+
+ /**
* @return the number of peers
*/
public int getNumPeers();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr 29 00:14:07 2014
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
@@ -164,7 +166,7 @@ public final class BSPPeerImpl<K1, V1, K
Constants.DEFAULT_PEER_HOST);
int bindPort = conf
.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
-
+
peerAddress = new InetSocketAddress(bindAddress, bindPort);
// This function call may change the current peer address
@@ -374,7 +376,7 @@ public final class BSPPeerImpl<K1, V1, K
final InetSocketAddress addr = entry.getKey();
final BSPMessageBundle<M> bundle = entry.getValue();
-
+
// remove this message during runtime to save a bit of memory
it.remove();
try {
@@ -523,6 +525,19 @@ public final class BSPPeerImpl<K1, V1, K
}
@Override
+ public final String[] getAdjacentPeerNames() {
+ initPeerNames();
+ List<String> localPeers = new ArrayList<String>();
+ for (String peerName : allPeers) {
+ if (peerName.startsWith(peerAddress.getHostName() + ":")) {
+ localPeers.add(peerName);
+ }
+ }
+
+ return localPeers.toArray(new String[localPeers.size()]);
+ }
+
+ @Override
public final String getPeerName(int index) {
initPeerNames();
return allPeers[index];
@@ -541,7 +556,7 @@ public final class BSPPeerImpl<K1, V1, K
private final void initPeerNames() {
if (allPeers == null) {
- allPeers = syncClient.getAllPeerNames(taskId);
+ allPeers = syncClient.getAllPeerNames(taskId.getJobID());
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Apr 29 00:14:07 2014
@@ -469,11 +469,6 @@ public class LocalBSPRunner implements J
}
@Override
- public String[] getAllPeerNames(TaskAttemptID taskId) {
- return peerNames;
- }
-
- @Override
public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
@@ -531,6 +526,11 @@ public class LocalBSPRunner implements J
public boolean remove(String key, SyncEventListener listener) {
return false;
}
+
+ @Override
+ public String[] getAllPeerNames(BSPJobID jobID) {
+ return peerNames;
+ }
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java Tue Apr 29 00:14:07 2014
@@ -73,11 +73,11 @@ public abstract class BSPPeerSyncClient
* Returns all registered tasks within the sync daemon. They have to be
* ordered ascending by their task id.
*
- * @param taskId the tasks ID
+ * @param jobID the job ID
* @return an <b>ordered</b> string array of host:port pairs of all tasks
* connected to the daemon.
*/
- public abstract String[] getAllPeerNames(TaskAttemptID taskId);
+ public abstract String[] getAllPeerNames(BSPJobID jobID);
/**
* TODO this has currently no use. Could later be used to deregister tasks
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java Tue Apr 29 00:14:07 2014
@@ -76,11 +76,11 @@ public interface PeerSyncClient extends
* Returns all registered tasks within the sync daemon. They have to be
* ordered ascending by their task id.
*
- * @param taskId the tasks ID
+ * @param jobID the job ID
* @return an <b>ordered</b> string array of host:port pairs of all tasks
* connected to the daemon.
*/
- public String[] getAllPeerNames(TaskAttemptID taskId);
+ public String[] getAllPeerNames(BSPJobID jobID);
/**
* TODO this has currently no use. Could later be used to deregister tasks
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Tue Apr 29 00:14:07 2014
@@ -273,18 +273,16 @@ public class ZooKeeperSyncClientImpl ext
}
@Override
- public String[] getAllPeerNames(TaskAttemptID taskId) {
+ public String[] getAllPeerNames(BSPJobID jobID) {
if (allPeers == null) {
TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
try {
- List<String> var = zk.getChildren(
- constructKey(taskId.getJobID(), "peers"), this);
+ List<String> var = zk.getChildren(constructKey(jobID, "peers"), this);
allPeers = var.toArray(new String[var.size()]);
TreeMap<TaskAttemptID, String> taskAttemptSortedMap = new TreeMap<TaskAttemptID, String>();
for (String s : allPeers) {
- byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),
- this, null);
+ byte[] data = zk.getData(constructKey(jobID, "peers", s), this, null);
TaskAttemptID thatTask = new TaskAttemptID();
boolean result = getValueFromBytes(data, thatTask);
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue Apr 29 00:14:07 2014
@@ -301,6 +301,12 @@ public class TestCheckpoint extends Test
return null;
}
+ @Override
+ public String[] getAdjacentPeerNames() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
public static class TempSyncClient extends BSPPeerSyncClient {
@@ -423,7 +429,7 @@ public class TestCheckpoint extends Test
}
@Override
- public String[] getAllPeerNames(TaskAttemptID taskId) {
+ public String[] getAllPeerNames(BSPJobID jobID) {
return null;
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1590846&r1=1590845&r2=1590846&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Tue Apr 29 00:14:07 2014
@@ -119,7 +119,7 @@ public class TestZooKeeper extends TestC
peerClient.constructKey(jobID, "info", "level2"), new IntWritable(5),
true, null);
- String[] names = peerClient.getAllPeerNames(task1);
+ String[] names = peerClient.getAllPeerNames(task1.getJobID());
Log.info("Found child count = " + names.length);