You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/25 03:21:52 UTC
svn commit: r1026932 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Mon Oct 25 01:21:52 2010
New Revision: 1026932
URL: http://svn.apache.org/viewvc?rev=1026932&view=rev
Log:
Renaming and Refactoring methods in BSPPeerInterface
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Oct 25 01:21:52 2010
@@ -172,6 +172,7 @@ Trunk (unreleased changes)
BUG FIXES
+ HAMA-316: Renaming and Refactoring methods in BSPPeerInterface (Filipe Manana)
HAMA-319: groom servers Map in HeartbeatResponse not correctly serialized
(Filipe Manana)
HAMA-317: Remove HBASE_CONF_DIR and HBASE_HEAPSIZE properties (edwardyoon)
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Mon Oct 25 01:21:52 2010
@@ -54,11 +54,11 @@ public class PiEstimator {
}
}
- byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
+ byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
BSPMessage estimate = new BSPMessage(tagName, myData);
- bspPeer.send(bspPeer.getAddress(masterTask), estimate);
+ bspPeer.send(masterTask, estimate);
bspPeer.sync();
double pi = 0.0;
@@ -105,8 +105,8 @@ public class PiEstimator {
BSPJobClient jobClient = new BSPJobClient(conf);
ClusterStatus cluster = jobClient.getClusterStatus(true);
// Choose one as a master
- for (String name : cluster.getActiveGroomNames()) {
- conf.set(MASTER_TASK, name);
+ for (String peerName : cluster.getActiveGroomNames().values()) {
+ conf.set(MASTER_TASK, peerName);
break;
}
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Mon Oct 25 01:21:52 2010
@@ -18,7 +18,6 @@
package org.apache.hama.examples;
import java.io.IOException;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,10 +41,10 @@ public class SerializePrinting {
int num = Integer.parseInt(conf.get("bsp.peers.num"));
int i = 0;
- for(Map.Entry<String, String> e : bspPeer.getAllPeers().entrySet()) {
- if(bspPeer.getHostName().equals(e.getValue())) {
+ for (String otherPeer : bspPeer.getAllPeerNames()) {
+ if (bspPeer.getPeerName().equals(otherPeer)) {
LOG.info("Hello BSP from " + i + " of " + num + ": "
- + bspPeer.getHostName());
+ + bspPeer.getPeerName());
}
Thread.sleep(200);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Mon Oct 25 01:21:52 2010
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
+import java.util.Map;
import javax.security.auth.login.LoginException;
@@ -551,8 +552,8 @@ public class BSPJobClient extends Config
*/
private void listActiveTrackers() throws IOException {
ClusterStatus c = jobSubmitClient.getClusterStatus(true);
- Collection<String> trackers = c.getActiveGroomNames();
- for (String trackerName : trackers) {
+ Map<String, String> trackers = c.getActiveGroomNames();
+ for (String trackerName : trackers.keySet()) {
System.out.println(trackerName);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Mon Oct 25 01:21:52 2010
@@ -88,8 +88,8 @@ public class BSPMaster implements JobSub
// (groom name --> last sent HeartBeatResponse)
Map<String, HeartbeatResponse> groomToHeartbeatResponseMap = new TreeMap<String, HeartbeatResponse>();
private HashMap<String, GroomServerStatus> groomServers = new HashMap<String, GroomServerStatus>();
- // maps groom server names to hosts (hostname:port)
- private HashMap<String, String> groomServerHosts = new HashMap<String, String>();
+ // maps groom server names to peer names
+ private HashMap<String, String> groomServerPeers = new HashMap<String, String>();
// Jobs' Meta Data
private Integer nextJobId = Integer.valueOf(1);
@@ -366,7 +366,7 @@ public class BSPMaster implements JobSub
Collections.<String, String>emptyMap());
}
- HeartbeatResponse response = new HeartbeatResponse(newResponseId, null, groomServerHosts);
+ HeartbeatResponse response = new HeartbeatResponse(newResponseId, null, groomServerPeers);
List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
updateTaskStatuses(status);
@@ -514,7 +514,7 @@ public class BSPMaster implements JobSub
}
if (initialContact) {
- groomServerHosts.put(groomStatus.getGroomName(), groomStatus.getHost());
+ groomServerPeers.put(groomStatus.getGroomName(), groomStatus.getPeerName());
}
return true;
@@ -550,16 +550,24 @@ public class BSPMaster implements JobSub
@Override
public ClusterStatus getClusterStatus(boolean detailed) {
- synchronized (groomServers) {
+ int numGroomServers;
+ Map<String, String> groomPeersMap = null;
+
+ // give the caller a snapshot of the cluster status
+ synchronized (this) {
+ numGroomServers = groomServerPeers.size();
if (detailed) {
- List<String> groomNames = groomServerNames();
- return new ClusterStatus(groomNames, totalTasks, totalTaskCapacity,
- state);
- } else {
- return new ClusterStatus(groomServers.size(), totalTasks,
- totalTaskCapacity, state);
+ groomPeersMap = new HashMap<String, String>(groomServerPeers);
}
}
+
+ if (detailed) {
+ return new ClusterStatus(groomPeersMap, totalTasks, totalTaskCapacity,
+ state);
+ } else {
+ return new ClusterStatus(numGroomServers, totalTasks, totalTaskCapacity,
+ state);
+ }
}
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Oct 25 01:21:52 2010
@@ -19,11 +19,13 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -51,16 +53,13 @@ public class BSPPeer implements Watcher,
protected ZooKeeper zk = null;
protected volatile Integer mutex = 0;
- protected final String peerAddr;
- protected final String bindAddress;
- protected final int bindPort;
protected final String bspRoot;
protected final String zookeeperAddr;
protected final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
- protected Map<String, String> allPeers = new HashMap<String, String>();
+ protected Set<String> allPeerNames = new HashSet<String>();
protected InetSocketAddress peerAddress;
/**
@@ -69,10 +68,8 @@ public class BSPPeer implements Watcher,
public BSPPeer(Configuration conf) throws IOException {
this.conf = conf;
- peerAddr = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST)
- + ":" + conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
- bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
- bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ String bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+ int bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
Constants.DEFAULT_ZOOKEEPER_ROOT);
zookeeperAddr = conf.get(Constants.ZOOKEEPER_QUORUM)
@@ -86,8 +83,8 @@ public class BSPPeer implements Watcher,
public void reinitialize() {
try {
- LOG.debug("reinitialize(): " + bindAddress + ":" + bindPort);
- server = RPC.getServer(this, bindAddress, bindPort, conf);
+ LOG.debug("reinitialize(): " + getPeerName());
+ server = RPC.getServer(this, peerAddress.getHostName(), peerAddress.getPort(), conf);
server.start();
} catch (IOException e) {
e.printStackTrace();
@@ -132,13 +129,13 @@ public class BSPPeer implements Watcher,
* org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
*/
@Override
- public void send(InetSocketAddress hostname, BSPMessage msg)
+ public void send(String peerName, BSPMessage msg)
throws IOException {
- LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + hostname.getHostName());
- ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(hostname);
+ LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+ ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(peerName);
if (queue == null) {
queue = new ConcurrentLinkedQueue<BSPMessage>();
- outgoingQueues.put(hostname, queue);
+ outgoingQueues.put(getAddress(peerName), queue);
}
queue.add(msg);
}
@@ -186,9 +183,9 @@ public class BSPPeer implements Watcher,
}
protected boolean enterBarrier() throws KeeperException, InterruptedException {
- LOG.debug("[" + peerAddr + "] enter the enterbarrier");
+ LOG.debug("[" + getPeerName() + "] enter the enterbarrier");
try {
- zk.create(bspRoot + "/" + peerAddr, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ zk.create(bspRoot + "/" + getPeerName(), new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
} catch (KeeperException e) {
e.printStackTrace();
@@ -211,7 +208,7 @@ public class BSPPeer implements Watcher,
}
protected boolean leaveBarrier() throws KeeperException, InterruptedException {
- zk.delete(bspRoot + "/" + peerAddr, 0);
+ zk.delete(bspRoot + "/" + getPeerName(), 0);
while (true) {
synchronized (mutex) {
@@ -219,7 +216,7 @@ public class BSPPeer implements Watcher,
if (list.size() > 0) {
mutex.wait();
} else {
- LOG.debug("[" + peerAddr + "] leave from the leaveBarrier");
+ LOG.debug("[" + getPeerName() + "] leave from the leaveBarrier");
return true;
}
}
@@ -239,11 +236,6 @@ public class BSPPeer implements Watcher,
}
@Override
- public boolean isRunning() {
- return true;
- }
-
- @Override
public void put(BSPMessage msg) throws IOException {
this.localQueue.add(msg);
}
@@ -275,29 +267,19 @@ public class BSPPeer implements Watcher,
/**
* @return the string as host:port of this Peer
*/
- public String getHostName() {
- return this.peerAddr;
- }
-
- /**
- * @return The address of this peer.
- */
- public InetSocketAddress getAddress() {
- return peerAddress;
+ public String getPeerName() {
+ return peerAddress.getHostName() + ":" + peerAddress.getPort();
}
- public InetSocketAddress getAddress(String hostname) {
- String peerAddr = allPeers.get(hostname);
- String[] peerAddrParts = peerAddr.split(":");
+ private InetSocketAddress getAddress(String peerName) {
+ String[] peerAddrParts = peerName.split(":");
return new InetSocketAddress(peerAddrParts[0], Integer
.parseInt(peerAddrParts[1]));
}
- /**
- * @return all the other peers executing tasks from the same job.
- */
- public Map<String, String> getAllPeers() {
- return allPeers;
+ @Override
+ public Set<String> getAllPeerNames() {
+ return allPeerNames;
}
/**
@@ -306,8 +288,13 @@ public class BSPPeer implements Watcher,
*
* @param allPeers
*/
- void setPeers(Map<String, String> allPeers) {
- this.allPeers = allPeers;
+ void setAllPeerNames(Collection<String> allPeerNames) {
+ this.allPeerNames = new HashSet<String>(allPeerNames);
+ }
+
+ @Override
+ public int getNumCurrentMessages() {
+ return localQueue.size();
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Mon Oct 25 01:21:52 2010
@@ -19,7 +19,7 @@ package org.apache.hama.bsp;
import java.io.Closeable;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.util.Set;
import org.apache.hama.Constants;
import org.apache.zookeeper.KeeperException;
@@ -35,28 +35,37 @@ public interface BSPPeerInterface extend
* @param msg
* @throws IOException
*/
- public void send(InetSocketAddress hostname, BSPMessage msg)
+ public void send(String peerName, BSPMessage msg)
throws IOException;
public void put(BSPMessage msg) throws IOException;
/**
- * @return the current message
+ * @return A message from the peer's received messages queue (a FIFO).
* @throws IOException
*/
public BSPMessage getCurrentMessage() throws IOException;
/**
- * Synchronize all of the data in the local queue to other BSP Peers.
+ * @return The number of messages in the peer's received messages queue.
+ */
+ public int getNumCurrentMessages();
+
+ /**
+ * Sends all the messages in the outgoing message queues to the corresponding remote peers.
*
* @throws InterruptedException
* @throws KeeperException
*/
public void sync() throws IOException, KeeperException, InterruptedException;
+
+ /**
+ * @return The name of this peer in the format "hostname:port".
+ */
+ public String getPeerName();
- public boolean isRunning();
-
- public InetSocketAddress getAddress();
-
- public String getHostName();
+ /**
+ * @return The names of all the peers executing tasks from the same job (including this peer).
+ */
+ public Set<String> getAllPeerNames();
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java Mon Oct 25 01:21:52 2010
@@ -21,9 +21,10 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -57,7 +58,7 @@ import org.apache.hadoop.io.WritableUtil
public class ClusterStatus implements Writable {
private int numActiveGrooms;
- private Collection<String> activeGrooms = new ArrayList<String>();
+ private Map<String, String> activeGrooms = new HashMap<String, String>();
private int tasks;
private int maxTasks;
private BSPMaster.State state;
@@ -74,7 +75,7 @@ public class ClusterStatus implements Wr
this.state = state;
}
- public ClusterStatus(Collection<String> activeGrooms, int tasks, int maxTasks,
+ public ClusterStatus(Map<String, String> activeGrooms, int tasks, int maxTasks,
BSPMaster.State state) {
this(activeGrooms.size(), tasks, maxTasks, state);
this.activeGrooms = activeGrooms;
@@ -90,11 +91,11 @@ public class ClusterStatus implements Wr
}
/**
- * Get the names of groom servers in the cluster.
+ * Get the names of groom servers, and their peers, in the cluster.
*
* @return the active groom servers in the cluster.
*/
- public Collection<String> getActiveGroomNames() {
+ public Map<String, String> getActiveGroomNames() {
return activeGrooms;
}
@@ -131,16 +132,23 @@ public class ClusterStatus implements Wr
//////////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
- if(activeGrooms.size() == 0) {
+ if (activeGrooms.isEmpty()) {
out.writeInt(numActiveGrooms);
- out.writeInt(0);
+ out.writeBoolean(false);
} else {
out.writeInt(activeGrooms.size());
- out.writeInt(activeGrooms.size());
- for(String groom: activeGrooms) {
- Text.writeString(out, groom);
+ out.writeBoolean(true);
+
+ String[] groomNames = activeGrooms.keySet().toArray(new String[0]);
+ List<String> peerNames = new ArrayList<String>();
+
+ for (String groomName : groomNames) {
+ peerNames.add(activeGrooms.get(groomName));
}
- }
+
+ WritableUtils.writeCompressedStringArray(out, groomNames);
+ WritableUtils.writeCompressedStringArray(out, peerNames.toArray(new String[0]));
+ }
out.writeInt(tasks);
out.writeInt(maxTasks);
WritableUtils.writeEnum(out, state);
@@ -149,14 +157,18 @@ public class ClusterStatus implements Wr
@Override
public void readFields(DataInput in) throws IOException {
numActiveGrooms = in.readInt();
- int numGroomNames = in.readInt();
- String name;
- if (numGroomNames > 0) {
- for(int i=0; i < numGroomNames; i++) {
- name = Text.readString(in);
- activeGrooms.add(name);
+ boolean groomListFollows = in.readBoolean();
+
+ if (groomListFollows) {
+ String[] groomNames = WritableUtils.readCompressedStringArray(in);
+ String[] peerNames = WritableUtils.readCompressedStringArray(in);
+ activeGrooms = new HashMap<String, String>(groomNames.length);
+
+ for (int i = 0; i < groomNames.length; i++) {
+ activeGrooms.put(groomNames[i], peerNames[i]);
}
}
+
tasks = in.readInt();
maxTasks = in.readInt();
state = WritableUtils.readEnum(in, BSPMaster.State.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Mon Oct 25 01:21:52 2010
@@ -128,7 +128,7 @@ public class GroomServer implements Runn
this.conf.set(Constants.PEER_HOST, localHostname);
bspPeer = new BSPPeer(conf);
- this.groomServerName = "groomd_" + bspPeer.getHostName().replace(':', '_');
+ this.groomServerName = "groomd_" + bspPeer.getPeerName().replace(':', '_');
LOG.info("Starting groom: " + this.groomServerName);
DistributedCache.purgeCache(this.conf);
@@ -218,11 +218,11 @@ public class GroomServer implements Runn
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
if (acceptNewTasks) {
- bspPeer.setPeers(heartbeatResponse.getGroomServers());
+ bspPeer.setAllPeerNames(heartbeatResponse.getGroomServers().values());
}
- for (Map.Entry<String, String> peer : bspPeer.getAllPeers().entrySet()) {
- LOG.debug("Remote peer, host:port is " + peer.getKey());
+ for (String peer : bspPeer.getAllPeerNames()) {
+ LOG.debug("Remote peer, host:port is " + peer);
}
GroomServerAction[] actions = heartbeatResponse.getActions();
@@ -400,7 +400,7 @@ public class GroomServer implements Runn
//
if (status == null) {
synchronized (this) {
- status = new GroomServerStatus(groomServerName, bspPeer.getHostName(),
+ status = new GroomServerStatus(groomServerName, bspPeer.getPeerName(),
cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
}
} else {
@@ -606,8 +606,4 @@ public class GroomServer implements Runn
+ groomServerClass.toString(), e);
}
}
-
- public String getServerName() {
- return bspPeer.getHostName();
- }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Mon Oct 25 01:21:52 2010
@@ -46,7 +46,7 @@ public class GroomServerStatus implement
}
String groomName;
- String host;
+ String peerName;
int failures;
List<TaskStatus> taskReports;
@@ -57,10 +57,10 @@ public class GroomServerStatus implement
taskReports = new ArrayList<TaskStatus>();
}
- public GroomServerStatus(String groomName, String host,
+ public GroomServerStatus(String groomName, String peerName,
List<TaskStatus> taskReports, int failures, int maxTasks) {
this.groomName = groomName;
- this.host = host;
+ this.peerName = peerName;
this.taskReports = new ArrayList<TaskStatus>(taskReports);
this.failures = failures;
this.maxTasks = maxTasks;
@@ -75,8 +75,8 @@ public class GroomServerStatus implement
*
* @return The groom server address in the format hostname:port
*/
- public String getHost() {
- return host;
+ public String getPeerName() {
+ return peerName;
}
/**
@@ -129,7 +129,7 @@ public class GroomServerStatus implement
@Override
public void readFields(DataInput in) throws IOException {
this.groomName = Text.readString(in);
- this.host = Text.readString(in);
+ this.peerName = Text.readString(in);
this.failures = in.readInt();
this.maxTasks = in.readInt();
taskReports.clear();
@@ -149,7 +149,7 @@ public class GroomServerStatus implement
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, groomName);
- Text.writeString(out, host);
+ Text.writeString(out, peerName);
out.writeInt(failures);
out.writeInt(maxTasks);
out.writeInt(taskReports.size());
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Mon Oct 25 01:21:52 2010
@@ -20,10 +20,11 @@
package org.apache.hama.bsp;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -84,6 +85,11 @@ public class TestBSPPeer extends HamaClu
public BSPPeerThread(Configuration conf) throws IOException {
this.peer = new BSPPeer(conf);
+ Set<String> peerNames = new HashSet<String>(NUM_PEER);
+ for (int i = 0; i < NUM_PEER; i++) {
+ peerNames.add("localhost_" + (30000 + i));
+ }
+ peer.setAllPeerNames(peerNames);
}
@Override
@@ -91,7 +97,6 @@ public class TestBSPPeer extends HamaClu
int randomTime;
byte[] dummyData = new byte[PAYLOAD];
BSPMessage msg = null;
- InetSocketAddress addr = null;
for (int i = 0; i < ROUND; i++) {
randomTime = r.nextInt(MAXIMUM_DURATION) + 5;
@@ -99,9 +104,9 @@ public class TestBSPPeer extends HamaClu
for (int j = 0; j < 10; j++) {
r.nextBytes(dummyData);
msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData);
- addr = new InetSocketAddress("localhost", 30000 + j);
+ String peerName = "localhost_" + (30000 + j);
try {
- peer.send(addr, msg);
+ peer.send(peerName, msg);
} catch (IOException e) {
LOG.info(e);
}
@@ -123,6 +128,7 @@ public class TestBSPPeer extends HamaClu
e.printStackTrace();
}
+ assertEquals(peer.getNumCurrentMessages(), 1);
verifyPayload();
}
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java Mon Oct 25 01:21:52 2010
@@ -18,11 +18,9 @@
package org.apache.hama.bsp;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
-import java.util.Set;
import junit.framework.TestCase;
@@ -43,10 +41,13 @@ public class TestClusterStatus extends T
DataInputBuffer in = new DataInputBuffer();
ClusterStatus status1;
- List<String> grooms = new ArrayList<String>();
+ Map<String, String> grooms = new HashMap<String, String>();
- for(int i=0;i< 10;i++) {
- grooms.add("groom_"+rnd.nextInt());
+ for (int i = 0; i < 10; i++) {
+ int num = rnd.nextInt();
+ String groomName = "groom_" + num;
+ String peerName = "peerhost:" + num;
+ grooms.put(groomName, peerName);
}
int tasks = rnd.nextInt(100);
@@ -61,14 +62,14 @@ public class TestClusterStatus extends T
ClusterStatus status2 = new ClusterStatus();
status2.readFields(in);
- Set<String> grooms_s = new HashSet<String>(status1.getActiveGroomNames());
- Set<String> grooms_o = new HashSet<String>(status2.getActiveGroomNames());
+ Map<String, String> grooms_s = new HashMap<String, String>(status1.getActiveGroomNames());
+ Map<String, String> grooms_o = new HashMap<String, String>(status2.getActiveGroomNames());
assertEquals(status1.getGroomServers(), status2.getGroomServers());
-
- assertTrue(grooms_s.containsAll(grooms_o));
- assertTrue(grooms_o.containsAll(grooms_s));
-
+
+ assertTrue(grooms_s.entrySet().containsAll(grooms_o.entrySet()));
+ assertTrue(grooms_o.entrySet().containsAll(grooms_s.entrySet()));
+
assertEquals(status1.getTasks(),status2.getTasks());
assertEquals(status1.getMaxTasks(), status2.getMaxTasks());
}