You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/25 03:21:52 UTC

svn commit: r1026932 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Mon Oct 25 01:21:52 2010
New Revision: 1026932

URL: http://svn.apache.org/viewvc?rev=1026932&view=rev
Log:
Renaming and Refactoring methods in BSPPeerInterface

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Oct 25 01:21:52 2010
@@ -172,6 +172,7 @@ Trunk (unreleased changes)
 
   BUG FIXES
   
+    HAMA-316: Renaming and Refactoring methods in BSPPeerInterface (Filipe Manana)
     HAMA-319: groom servers Map in HeartbeatResponse not correctly serialized
                 (Filipe Manana)
     HAMA-317: Remove HBASE_CONF_DIR and HBASE_HEAPSIZE properties (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Mon Oct 25 01:21:52 2010
@@ -54,11 +54,11 @@ public class PiEstimator {
         }
       }
 
-      byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
+      byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
       BSPMessage estimate = new BSPMessage(tagName, myData);
 
-      bspPeer.send(bspPeer.getAddress(masterTask), estimate);
+      bspPeer.send(masterTask, estimate);
       bspPeer.sync();
 
       double pi = 0.0;
@@ -105,8 +105,8 @@ public class PiEstimator {
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(true);
     // Choose one as a master
-    for (String name : cluster.getActiveGroomNames()) {
-      conf.set(MASTER_TASK, name);
+    for (String peerName : cluster.getActiveGroomNames().values()) {
+      conf.set(MASTER_TASK, peerName);
       break;
     }
 

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Mon Oct 25 01:21:52 2010
@@ -18,7 +18,6 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,10 +41,10 @@ public class SerializePrinting {
       int num = Integer.parseInt(conf.get("bsp.peers.num"));
 
       int i = 0;
-      for(Map.Entry<String, String> e : bspPeer.getAllPeers().entrySet()) {
-        if(bspPeer.getHostName().equals(e.getValue())) {
+      for (String otherPeer : bspPeer.getAllPeerNames()) {
+        if (bspPeer.getPeerName().equals(otherPeer)) {
           LOG.info("Hello BSP from " + i + " of " + num + ": "
-              + bspPeer.getHostName());
+              + bspPeer.getPeerName());
         }
         
         Thread.sleep(200);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Mon Oct 25 01:21:52 2010
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Map;
 
 import javax.security.auth.login.LoginException;
 
@@ -551,8 +552,8 @@ public class BSPJobClient extends Config
    */
   private void listActiveTrackers() throws IOException {
     ClusterStatus c = jobSubmitClient.getClusterStatus(true);
-    Collection<String> trackers = c.getActiveGroomNames();
-    for (String trackerName : trackers) {
+    Map<String, String> trackers = c.getActiveGroomNames();
+    for (String trackerName : trackers.keySet()) {
       System.out.println(trackerName);
     }
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Mon Oct 25 01:21:52 2010
@@ -88,8 +88,8 @@ public class BSPMaster implements JobSub
   // (groom name --> last sent HeartBeatResponse)
   Map<String, HeartbeatResponse> groomToHeartbeatResponseMap = new TreeMap<String, HeartbeatResponse>();
   private HashMap<String, GroomServerStatus> groomServers = new HashMap<String, GroomServerStatus>();
-  // maps groom server names to hosts (hostname:port)
-  private HashMap<String, String> groomServerHosts = new HashMap<String, String>();
+  // maps groom server names to peer names
+  private HashMap<String, String> groomServerPeers = new HashMap<String, String>();
 
   // Jobs' Meta Data
   private Integer nextJobId = Integer.valueOf(1);
@@ -366,7 +366,7 @@ public class BSPMaster implements JobSub
           Collections.<String, String>emptyMap());
     }
 
-    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null, groomServerHosts);
+    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null, groomServerPeers);
     List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
 
     updateTaskStatuses(status);
@@ -514,7 +514,7 @@ public class BSPMaster implements JobSub
     }
 
     if (initialContact) {
-      groomServerHosts.put(groomStatus.getGroomName(), groomStatus.getHost());
+      groomServerPeers.put(groomStatus.getGroomName(), groomStatus.getPeerName());
     }
 
     return true;
@@ -550,16 +550,24 @@ public class BSPMaster implements JobSub
 
   @Override
   public ClusterStatus getClusterStatus(boolean detailed) {
-    synchronized (groomServers) {
+    int numGroomServers;
+    Map<String, String> groomPeersMap = null;
+
+    // give the caller a snapshot of the cluster status
+    synchronized (this) {
+      numGroomServers = groomServerPeers.size();
       if (detailed) {
-        List<String> groomNames = groomServerNames();
-        return new ClusterStatus(groomNames, totalTasks, totalTaskCapacity,
-            state);
-      } else {
-        return new ClusterStatus(groomServers.size(), totalTasks,
-            totalTaskCapacity, state);
+        groomPeersMap = new HashMap<String, String>(groomServerPeers);
       }
     }
+
+    if (detailed) {
+      return new ClusterStatus(groomPeersMap, totalTasks, totalTaskCapacity,
+          state);
+    } else {
+      return new ClusterStatus(numGroomServers, totalTasks, totalTaskCapacity,
+          state);
+    }
   }
 
   /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Oct 25 01:21:52 2010
@@ -19,11 +19,13 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -51,16 +53,13 @@ public class BSPPeer implements Watcher,
   protected ZooKeeper zk = null;
   protected volatile Integer mutex = 0;
 
-  protected final String peerAddr;
-  protected final String bindAddress;
-  protected final int bindPort;
   protected final String bspRoot;
   protected final String zookeeperAddr;
 
   protected final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
   protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
   protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
-  protected Map<String, String> allPeers = new HashMap<String, String>();
+  protected Set<String> allPeerNames = new HashSet<String>();
   protected InetSocketAddress peerAddress;
 
   /**
@@ -69,10 +68,8 @@ public class BSPPeer implements Watcher,
   public BSPPeer(Configuration conf) throws IOException {
     this.conf = conf;
 
-    peerAddr = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST)
-        + ":" + conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
-    bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
-    bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+    String bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    int bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
     bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
         Constants.DEFAULT_ZOOKEEPER_ROOT);
     zookeeperAddr = conf.get(Constants.ZOOKEEPER_QUORUM)
@@ -86,8 +83,8 @@ public class BSPPeer implements Watcher,
 
   public void reinitialize() {
     try {
-      LOG.debug("reinitialize(): " + bindAddress + ":" + bindPort);
-      server = RPC.getServer(this, bindAddress, bindPort, conf);
+      LOG.debug("reinitialize(): " + getPeerName());
+      server = RPC.getServer(this, peerAddress.getHostName(), peerAddress.getPort(), conf);
       server.start();
     } catch (IOException e) {
       e.printStackTrace();
@@ -132,13 +129,13 @@ public class BSPPeer implements Watcher,
    * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
    */
   @Override
-  public void send(InetSocketAddress hostname, BSPMessage msg)
+  public void send(String peerName, BSPMessage msg)
       throws IOException {
-    LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + hostname.getHostName());
-    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(hostname);
+    LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(peerName);
     if (queue == null) {
       queue = new ConcurrentLinkedQueue<BSPMessage>();
-      outgoingQueues.put(hostname, queue);
+      outgoingQueues.put(getAddress(peerName), queue);
     }
     queue.add(msg);
   }
@@ -186,9 +183,9 @@ public class BSPPeer implements Watcher,
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
-    LOG.debug("[" + peerAddr + "] enter the enterbarrier");
+    LOG.debug("[" + getPeerName() + "] enter the enterbarrier");
     try {
-      zk.create(bspRoot + "/" + peerAddr, new byte[0], Ids.OPEN_ACL_UNSAFE,
+      zk.create(bspRoot + "/" + getPeerName(), new byte[0], Ids.OPEN_ACL_UNSAFE,
           CreateMode.EPHEMERAL);
     } catch (KeeperException e) {
       e.printStackTrace();
@@ -211,7 +208,7 @@ public class BSPPeer implements Watcher,
   }
 
   protected boolean leaveBarrier() throws KeeperException, InterruptedException {
-    zk.delete(bspRoot + "/" + peerAddr, 0);
+    zk.delete(bspRoot + "/" + getPeerName(), 0);
 
     while (true) {
       synchronized (mutex) {
@@ -219,7 +216,7 @@ public class BSPPeer implements Watcher,
         if (list.size() > 0) {
           mutex.wait();
         } else {
-          LOG.debug("[" + peerAddr + "] leave from the leaveBarrier");
+          LOG.debug("[" + getPeerName() + "] leave from the leaveBarrier");
           return true;
         }
       }
@@ -239,11 +236,6 @@ public class BSPPeer implements Watcher,
   }
 
   @Override
-  public boolean isRunning() {
-    return true;
-  }
-
-  @Override
   public void put(BSPMessage msg) throws IOException {
     this.localQueue.add(msg);
   }
@@ -275,29 +267,19 @@ public class BSPPeer implements Watcher,
   /**
    * @return the string as host:port of this Peer 
    */
-  public String getHostName() {
-    return this.peerAddr;
-  }
-
-  /**
-   * @return The address of this peer.
-   */
-  public InetSocketAddress getAddress() {
-    return peerAddress;
+  public String getPeerName() {
+    return peerAddress.getHostName() + ":" + peerAddress.getPort();
   }
 
-  public InetSocketAddress getAddress(String hostname) {
-    String peerAddr = allPeers.get(hostname);
-    String[] peerAddrParts = peerAddr.split(":");
+  private InetSocketAddress getAddress(String peerName) {
+    String[] peerAddrParts = peerName.split(":");
     return new InetSocketAddress(peerAddrParts[0], Integer
         .parseInt(peerAddrParts[1]));
   }
 
-  /**
-   * @return all the other peers executing tasks from the same job.
-   */
-  public Map<String, String> getAllPeers() {
-    return allPeers;
+  @Override
+  public Set<String> getAllPeerNames() {
+    return allPeerNames;
   }
 
   /**
@@ -306,8 +288,13 @@ public class BSPPeer implements Watcher,
    * 
    * @param allPeers
    */
-  void setPeers(Map<String, String> allPeers) {
-    this.allPeers = allPeers;
+  void setAllPeerNames(Collection<String> allPeerNames) {
+    this.allPeerNames = new HashSet<String>(allPeerNames);
+  }
+
+  @Override
+  public int getNumCurrentMessages() {
+    return localQueue.size();
   }
 
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Mon Oct 25 01:21:52 2010
@@ -19,7 +19,7 @@ package org.apache.hama.bsp;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.util.Set;
 
 import org.apache.hama.Constants;
 import org.apache.zookeeper.KeeperException;
@@ -35,28 +35,37 @@ public interface BSPPeerInterface extend
    * @param msg
    * @throws IOException
    */
-  public void send(InetSocketAddress hostname, BSPMessage msg)
+  public void send(String peerName, BSPMessage msg)
       throws IOException;
 
   public void put(BSPMessage msg) throws IOException;
   
   /**
-   * @return the current message
+   * @return A message from the peer's received messages queue (a FIFO).
    * @throws IOException
    */
   public BSPMessage getCurrentMessage() throws IOException;
 
   /**
-   * Synchronize all of the data in the local queue to other BSP Peers.
+   * @return The number of messages in the peer's received messages queue.
+   */
+  public int getNumCurrentMessages();
+
+  /**
+   * Sends all the messages in the outgoing message queues to the corresponding remote peers.
    * 
    * @throws InterruptedException
    * @throws KeeperException
    */
   public void sync() throws IOException, KeeperException, InterruptedException;
+
+  /**
+   * @return The name of this peer in the format "hostname:port".
+   */
+  public String getPeerName();
   
-  public boolean isRunning();
-  
-  public InetSocketAddress getAddress();
-  
-  public String getHostName();
+  /**
+   * @return The names of all the peers executing tasks from the same job (including this peer).
+   */
+  public Set<String> getAllPeerNames();
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java Mon Oct 25 01:21:52 2010
@@ -21,9 +21,10 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -57,7 +58,7 @@ import org.apache.hadoop.io.WritableUtil
 public class ClusterStatus implements Writable {
 
   private int numActiveGrooms;
-  private Collection<String> activeGrooms = new ArrayList<String>();
+  private Map<String, String> activeGrooms = new HashMap<String, String>();
   private int tasks;
   private int maxTasks;
   private BSPMaster.State state;
@@ -74,7 +75,7 @@ public class ClusterStatus implements Wr
     this.state = state;
   }
   
-  public ClusterStatus(Collection<String> activeGrooms, int tasks, int maxTasks,
+  public ClusterStatus(Map<String, String> activeGrooms, int tasks, int maxTasks,
       BSPMaster.State state) {
     this(activeGrooms.size(), tasks, maxTasks, state);
     this.activeGrooms = activeGrooms;
@@ -90,11 +91,11 @@ public class ClusterStatus implements Wr
   }
   
   /**
-   * Get the names of groom servers in the cluster.
+   * Get the names of groom servers, and their peers, in the cluster.
    * 
    * @return the active groom servers in the cluster.
    */  
-  public Collection<String> getActiveGroomNames() {
+  public Map<String, String> getActiveGroomNames() {
     return activeGrooms;
   }
   
@@ -131,16 +132,23 @@ public class ClusterStatus implements Wr
   //////////////////////////////////////////////
   @Override
   public void write(DataOutput out) throws IOException {
-    if(activeGrooms.size() == 0) {
+    if (activeGrooms.isEmpty()) {
       out.writeInt(numActiveGrooms);
-      out.writeInt(0);
+      out.writeBoolean(false);
     } else {
       out.writeInt(activeGrooms.size());
-      out.writeInt(activeGrooms.size());
-      for(String groom: activeGrooms) {
-        Text.writeString(out, groom);
+      out.writeBoolean(true);
+
+      String[] groomNames = activeGrooms.keySet().toArray(new String[0]);
+      List<String> peerNames = new ArrayList<String>();
+
+      for (String groomName : groomNames) {
+        peerNames.add(activeGrooms.get(groomName));
       }
-    }    
+
+      WritableUtils.writeCompressedStringArray(out, groomNames);
+      WritableUtils.writeCompressedStringArray(out, peerNames.toArray(new String[0]));
+    }
     out.writeInt(tasks);
     out.writeInt(maxTasks);
     WritableUtils.writeEnum(out, state);
@@ -149,14 +157,18 @@ public class ClusterStatus implements Wr
   @Override
   public void readFields(DataInput in) throws IOException {
     numActiveGrooms = in.readInt();
-    int numGroomNames = in.readInt();
-    String name;
-    if (numGroomNames > 0) {
-      for(int i=0; i < numGroomNames; i++) {
-        name = Text.readString(in);
-        activeGrooms.add(name);
+    boolean groomListFollows = in.readBoolean();
+
+    if (groomListFollows) {
+      String[] groomNames = WritableUtils.readCompressedStringArray(in);
+      String[] peerNames = WritableUtils.readCompressedStringArray(in);
+      activeGrooms = new HashMap<String, String>(groomNames.length);
+
+      for (int i = 0; i < groomNames.length; i++) {
+        activeGrooms.put(groomNames[i], peerNames[i]);
       }
     }
+
     tasks = in.readInt();
     maxTasks = in.readInt();
     state = WritableUtils.readEnum(in, BSPMaster.State.class);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Mon Oct 25 01:21:52 2010
@@ -128,7 +128,7 @@ public class GroomServer implements Runn
     this.conf.set(Constants.PEER_HOST, localHostname);
     bspPeer = new BSPPeer(conf);
 
-    this.groomServerName = "groomd_" + bspPeer.getHostName().replace(':', '_');
+    this.groomServerName = "groomd_" + bspPeer.getPeerName().replace(':', '_');
     LOG.info("Starting groom: " + this.groomServerName);
 
     DistributedCache.purgeCache(this.conf);
@@ -218,11 +218,11 @@ public class GroomServer implements Runn
         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
 
         if (acceptNewTasks) {
-          bspPeer.setPeers(heartbeatResponse.getGroomServers());
+          bspPeer.setAllPeerNames(heartbeatResponse.getGroomServers().values());
         }
 
-        for (Map.Entry<String, String> peer : bspPeer.getAllPeers().entrySet()) {
-          LOG.debug("Remote peer, host:port is " + peer.getKey());
+        for (String peer : bspPeer.getAllPeerNames()) {
+          LOG.debug("Remote peer, host:port is " + peer);
         }
 
         GroomServerAction[] actions = heartbeatResponse.getActions();
@@ -400,7 +400,7 @@ public class GroomServer implements Runn
     //
     if (status == null) {
       synchronized (this) {
-        status = new GroomServerStatus(groomServerName, bspPeer.getHostName(),
+        status = new GroomServerStatus(groomServerName, bspPeer.getPeerName(),
             cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
       }
     } else {
@@ -606,8 +606,4 @@ public class GroomServer implements Runn
           + groomServerClass.toString(), e);
     }
   }
-
-  public String getServerName() {
-    return bspPeer.getHostName();
-  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Mon Oct 25 01:21:52 2010
@@ -46,7 +46,7 @@ public class GroomServerStatus implement
   }
   
   String groomName;
-  String host;
+  String peerName;
   int failures;
   List<TaskStatus> taskReports;
   
@@ -57,10 +57,10 @@ public class GroomServerStatus implement
     taskReports = new ArrayList<TaskStatus>();
   }
   
-  public GroomServerStatus(String groomName, String host, 
+  public GroomServerStatus(String groomName, String peerName,
       List<TaskStatus> taskReports, int failures, int maxTasks) {
     this.groomName = groomName;
-    this.host = host;
+    this.peerName = peerName;
     this.taskReports = new ArrayList<TaskStatus>(taskReports);
     this.failures = failures;
     this.maxTasks = maxTasks;
@@ -75,8 +75,8 @@ public class GroomServerStatus implement
    *
    * @return The groom server address in the format hostname:port
    */
-  public String getHost() {
-    return host;
+  public String getPeerName() {
+    return peerName;
   }
   
   /**
@@ -129,7 +129,7 @@ public class GroomServerStatus implement
   @Override
   public void readFields(DataInput in) throws IOException {
     this.groomName = Text.readString(in);
-    this.host = Text.readString(in);
+    this.peerName = Text.readString(in);
     this.failures = in.readInt();
     this.maxTasks = in.readInt();
     taskReports.clear();
@@ -149,7 +149,7 @@ public class GroomServerStatus implement
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, groomName);
-    Text.writeString(out, host);
+    Text.writeString(out, peerName);
     out.writeInt(failures);
     out.writeInt(maxTasks);
     out.writeInt(taskReports.size());

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Mon Oct 25 01:21:52 2010
@@ -20,10 +20,11 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -84,6 +85,11 @@ public class TestBSPPeer extends HamaClu
 
     public BSPPeerThread(Configuration conf) throws IOException {
       this.peer = new BSPPeer(conf);
+      Set<String> peerNames = new HashSet<String>(NUM_PEER);
+      for (int i = 0; i < NUM_PEER; i++) {
+        peerNames.add("localhost_" + (30000 + i));
+      }
+      peer.setAllPeerNames(peerNames);
     }
 
     @Override
@@ -91,7 +97,6 @@ public class TestBSPPeer extends HamaClu
       int randomTime;
       byte[] dummyData = new byte[PAYLOAD];
       BSPMessage msg = null;
-      InetSocketAddress addr = null;
 
       for (int i = 0; i < ROUND; i++) {
         randomTime = r.nextInt(MAXIMUM_DURATION) + 5;
@@ -99,9 +104,9 @@ public class TestBSPPeer extends HamaClu
         for (int j = 0; j < 10; j++) {
           r.nextBytes(dummyData);
           msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData);
-          addr = new InetSocketAddress("localhost", 30000 + j);
+          String peerName = "localhost_" + (30000 + j);
           try {
-            peer.send(addr, msg);
+            peer.send(peerName, msg);
           } catch (IOException e) {
             LOG.info(e);
           }
@@ -123,6 +128,7 @@ public class TestBSPPeer extends HamaClu
           e.printStackTrace();
         }
 
+        assertEquals(peer.getNumCurrentMessages(), 1);
         verifyPayload();
       }
     }

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java?rev=1026932&r1=1026931&r2=1026932&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestClusterStatus.java Mon Oct 25 01:21:52 2010
@@ -18,11 +18,9 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 
 import junit.framework.TestCase;
 
@@ -43,10 +41,13 @@ public class TestClusterStatus extends T
     DataInputBuffer in = new DataInputBuffer();
     
     ClusterStatus status1;
-    List<String> grooms = new ArrayList<String>();
+    Map<String, String> grooms = new HashMap<String, String>();
         
-    for(int i=0;i< 10;i++) {
-      grooms.add("groom_"+rnd.nextInt());
+    for (int i = 0; i < 10; i++) {
+      int num = rnd.nextInt();
+      String groomName = "groom_" + num;
+      String peerName = "peerhost:" + num;
+      grooms.put(groomName, peerName);
     }
     
     int tasks = rnd.nextInt(100);
@@ -61,14 +62,14 @@ public class TestClusterStatus extends T
     ClusterStatus status2 = new ClusterStatus();
     status2.readFields(in);
     
-    Set<String> grooms_s = new HashSet<String>(status1.getActiveGroomNames());
-    Set<String> grooms_o = new HashSet<String>(status2.getActiveGroomNames());     
+    Map<String, String> grooms_s = new HashMap<String, String>(status1.getActiveGroomNames());
+    Map<String, String> grooms_o = new HashMap<String, String>(status2.getActiveGroomNames());
     
     assertEquals(status1.getGroomServers(), status2.getGroomServers());
-    
-    assertTrue(grooms_s.containsAll(grooms_o));
-    assertTrue(grooms_o.containsAll(grooms_s));    
-    
+
+    assertTrue(grooms_s.entrySet().containsAll(grooms_o.entrySet()));
+    assertTrue(grooms_o.entrySet().containsAll(grooms_s.entrySet()));
+
     assertEquals(status1.getTasks(),status2.getTasks());
     assertEquals(status1.getMaxTasks(), status2.getMaxTasks());
   }