You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/12 03:27:29 UTC

svn commit: r1021571 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Tue Oct 12 01:27:28 2010
New Revision: 1021571

URL: http://svn.apache.org/viewvc?rev=1021571&view=rev
Log:
Allow BSP peers to get a list of all other peers executing tasks of the same job

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Oct 12 01:27:28 2010
@@ -4,7 +4,10 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
-    HAMA-267: Add command-line interface for job management (hyunsik via edwardyoon)
+    HAMA-304: Allow BSP peers to get a list of all other peers 
+                executing tasks of the same job (Filipe Manana via edwardyoon)
+    HAMA-267: Add command-line interface for job management
+                (hyunsik via edwardyoon)
     HAMA-279: Add "serialize printing" to examples (edwardyoon)
     HAMA-272: Hama/Zookeeper Integration (edwardyoon)
     HAMA-265: Add example Pi estimatior based on BSP (edwardyoon)
@@ -58,8 +61,8 @@ Trunk (unreleased changes)
     HAMA-293: Add starter classes for ZooKeeper, BSPMaster
                 and GroomServer (Filipe Manana via edwardyoon)
     HAMA-286: Task progress should be monitored (eddieyoon)
-    HAMA-287: BSPMaster should use the bsp.master.port config property
-                when creating its InetSocketAddr instance (Filipe Manana via edwardyoon)
+    HAMA-287: BSPMaster should use the bsp.master.port property when creating
+                its InetSocketAddr instance (Filipe Manana via edwardyoon)
     HAMA-283: Removing duplicate code (Filipe Manana via edwardyoon)
     HAMA-277: Add default number of bsp task (edwardyoon)
     HAMA-273: Implement killJob() method for local job (edwardyoon)
@@ -81,7 +84,7 @@ Trunk (unreleased changes)
     HAMA-217: Replacement of Block Multiplication Map/Reduce (edwardyoon)
     HAMA-205: Replacement of NormMap/Reduce (edwardyoon)
     HAMA-207: Replacement of Mat-Mat addition Map/Reduce (edwardyoon)
-    HAMA-208: Replacement of vector-matrix multiplication Map/Reduce (edwardyoon)
+    HAMA-208: Replacement of vector-matrix multiplication MapReduce (edwardyoon)
     HAMA-216: Removing JobManager in util package (edwardyoon)
     HAMA-215: Removing hama shell from version 0.2 (edwardyoon)
     HAMA-204: Replacement of TransposeMap/Reduce (edwardyoon)
@@ -174,10 +177,14 @@ Trunk (unreleased changes)
                        to groom servers (Filipe Manana via edwardyoon)
     HAMA-280: Fix warnings (Filipe Manana via edwardyoon)
     HAMA-270: wrong sequence of readFields() of ClusterStatus (hyunsik)
-    HAMA-260: Current version command of script is linked unknown class (edwardyoon)
-    HAMA-250: Add --no-check-certificate option to 'wget' command lines (edwardyoon)
-    HAMA-242: Wrong assignment of default setting inside BSPPeer constructor (hyunsik)
-    HAMA-246: Current shell scripts for bsp daemon have several problems. (hyunsik)
+    HAMA-260: Current version command of script is linked unknown class 
+                (edwardyoon)
+    HAMA-250: Add --no-check-certificate option to 'wget' command lines 
+                (edwardyoon)
+    HAMA-242: Wrong assignment of default setting inside BSPPeer constructor 
+                (hyunsik)
+    HAMA-246: Current shell scripts for bsp daemon have several problems. 
+                (hyunsik)
     HAMA-238: Example fail when performing sparse matrices addition (edwardyoon)
     HAMA-225: Jacobi iteration is in a infinite loop (edwardyoon)
     HAMA-214: Can't run the examples (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Oct 12 01:27:28 2010
@@ -18,7 +18,6 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,15 +28,19 @@ import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.util.Bytes;
 import org.apache.zookeeper.KeeperException;
 
 public class PiEstimator {
+  private static String MASTER_TASK = "master.task.";
+
   public static class MyEstimator extends BSP {
     public static final Log LOG = LogFactory.getLog(MyEstimator.class);
     private Configuration conf;
+    private String masterTask;
     private static final int iterations = 10000;
-    
+
     @Override
     public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
         InterruptedException {
@@ -55,7 +58,7 @@ public class PiEstimator {
       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
       BSPMessage estimate = new BSPMessage(tagName, myData);
 
-      bspPeer.send(new InetSocketAddress("localhost", 61000), estimate);
+      bspPeer.send(bspPeer.getAddress(masterTask), estimate);
       bspPeer.sync();
 
       double pi = 0.0;
@@ -78,8 +81,9 @@ public class PiEstimator {
     @Override
     public void setConf(Configuration conf) {
       this.conf = conf;
+      this.masterTask = conf.get(MASTER_TASK);
     }
-    
+
   }
 
   public static void main(String[] args) throws InterruptedException,
@@ -87,13 +91,21 @@ public class PiEstimator {
     // BSP job configuration
     HamaConfiguration conf = new HamaConfiguration();
     // Execute locally
-    //conf.set("bsp.master.address", "local");
+    // conf.set("bsp.master.address", "local");
 
     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
     // Set the job name
     bsp.setJobName("pi estimation example");
     bsp.setBspClass(MyEstimator.class);
 
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(true);
+    // Choose one as a master
+    for (String name : cluster.getActiveGroomNames()) {
+      conf.set(MASTER_TASK, name);
+      break;
+    }
+
     BSPJobClient.runJob(bsp);
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Tue Oct 12 01:27:28 2010
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -87,6 +88,8 @@ public class BSPMaster implements JobSub
   // (groom name --> last sent HeartBeatResponse)
   Map<String, HeartbeatResponse> groomToHeartbeatResponseMap = new TreeMap<String, HeartbeatResponse>();
   private HashMap<String, GroomServerStatus> groomServers = new HashMap<String, GroomServerStatus>();
+  // maps groom server names to hosts (hostname:port)
+  private HashMap<String, String> groomServerHosts = new HashMap<String, String>();
 
   // Jobs' Meta Data
   private int nextJobId = 1;
@@ -359,10 +362,11 @@ public class BSPMaster implements JobSub
         groomToHeartbeatResponseMap.remove(groomName);
       }
       return new HeartbeatResponse(newResponseId,
-          new GroomServerAction[] { new ReinitTrackerAction() });
+          new GroomServerAction[] { new ReinitTrackerAction() },
+          Collections.<String, String>emptyMap());
     }
 
-    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
+    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null, groomServerHosts);
     List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
 
     updateTaskStatuses(status);
@@ -509,6 +513,10 @@ public class BSPMaster implements JobSub
       }
     }
 
+    if (initialContact) {
+      groomServerHosts.put(groomStatus.getGroomName(), groomStatus.getHost());
+    }
+
     return true;
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Oct 12 01:27:28 2010
@@ -19,6 +19,7 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -49,7 +50,7 @@ public class BSPPeer implements Watcher,
   protected Server server = null;
   protected ZooKeeper zk = null;
   protected volatile Integer mutex = 0;
-  
+
   protected final String serverName;
   protected final String bindAddress;
   protected final int bindPort;
@@ -60,6 +61,8 @@ public class BSPPeer implements Watcher,
   protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
   protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
   protected int id;
+  protected Map<String, String> allPeers = new HashMap<String, String>();
+  protected InetSocketAddress peerAddress;
 
   /**
    * 
@@ -78,6 +81,7 @@ public class BSPPeer implements Watcher,
         + ":"
         + conf.getInt(Constants.ZOOKEPER_CLIENT_PORT,
             Constants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    peerAddress = new InetSocketAddress(bindAddress, bindPort);
 
     reinitialize();
   }
@@ -96,7 +100,7 @@ public class BSPPeer implements Watcher,
     } catch (IOException e) {
       e.printStackTrace();
     }
-    
+
     Stat s = null;
     if (zk != null) {
       try {
@@ -115,8 +119,8 @@ public class BSPPeer implements Watcher,
           LOG.error(e);
         }
       }
-    }      
-    
+    }
+
   }
 
   @Override
@@ -170,13 +174,13 @@ public class BSPPeer implements Watcher,
         peer.put(messages.next());
       }
     }
-    
+
     // Should we clearing outgoingQueues?
     this.outgoingQueues.clear();
-    
+
     enterBarrier();
     Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work
-                                               // because
+    // because
     // it can be affected by network condition,
     // the number of peers, and the load of zookeeper.
     // It should fixed to some flawless way.
@@ -198,7 +202,7 @@ public class BSPPeer implements Watcher,
       synchronized (mutex) {
         List<String> list = zk.getChildren(bspRoot, true);
 
-        // TODO it must be same with the number of slave nodes, at this time. 
+        // TODO it must be same with the number of slave nodes, at this time.
         if (list.size() < conf.getInt("bsp.peers.num", 0)) {
           mutex.wait();
         } else {
@@ -279,4 +283,35 @@ public class BSPPeer implements Watcher,
     return this.id;
   }
 
+  /**
+   * @return The address of this peer.
+   */
+  public InetSocketAddress getAddress() {
+    return peerAddress;
+  }
+
+  public InetSocketAddress getAddress(String hostname) {
+    String peerAddr = allPeers.get(hostname);
+    String[] peerAddrParts = peerAddr.split(":");
+    return new InetSocketAddress(peerAddrParts[0], Integer
+        .parseInt(peerAddrParts[1]));
+  }
+
+  /**
+   * @return all the other peers executing tasks from the same job.
+   */
+  public Map<String, String> getAllPeers() {
+    return allPeers;
+  }
+
+  /**
+   * To be invoked by the Groom Server with a list of peers received from an
+   * heartbeat response (BSPMaster).
+   * 
+   * @param allPeers
+   */
+  void setPeers(Map<String, String> allPeers) {
+    this.allPeers = allPeers;
+  }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Oct 12 01:27:28 2010
@@ -122,7 +122,10 @@ public class GroomServer implements Runn
     this.runningTasks = new LinkedHashMap<String, TaskInProgress>();
     this.acceptNewTasks = true;
 
-    this.groomServerName = "groomd_" + localHostname;
+    this.conf.set(Constants.PEER_HOST, localHostname);
+    bspPeer = new BSPPeer(conf);
+
+    this.groomServerName = "groomd_" + bspPeer.getServerName().replace(':', '_');
     LOG.info("Starting groom: " + this.groomServerName);
 
     DistributedCache.purgeCache(this.conf);
@@ -131,9 +134,6 @@ public class GroomServer implements Runn
         InterTrackerProtocol.class, InterTrackerProtocol.versionID,
         bspMasterAddr, conf);
     this.running = true;
-    
-    this.conf.set(Constants.PEER_HOST, localHostname);
-    bspPeer = new BSPPeer(conf);
   }
 
   private static void checkLocalDirs(String[] localDirs)
@@ -214,6 +214,14 @@ public class GroomServer implements Runn
         // Send the heartbeat and process the bspmaster's directives
         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
 
+        if (acceptNewTasks) {
+          bspPeer.setPeers(heartbeatResponse.getGroomServers());
+        }
+
+        for (Map.Entry<String, String> peer : bspPeer.getAllPeers().entrySet()) {
+          LOG.debug("Remote peer, host:port is " + peer.getKey());
+        }
+
         GroomServerAction[] actions = heartbeatResponse.getActions();
         LOG.debug("Got heartbeatResponse from BSPMaster with responseId: "
             + heartbeatResponse.getResponseId() + " and "
@@ -285,7 +293,7 @@ public class GroomServer implements Runn
     //
     if (status == null) {
       synchronized (this) {
-        status = new GroomServerStatus(groomServerName, localHostname,
+        status = new GroomServerStatus(groomServerName, bspPeer.getServerName(),
             cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
       }
     } else {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Tue Oct 12 01:27:28 2010
@@ -70,6 +70,11 @@ public class GroomServerStatus implement
     return groomName;
   }
   
+  /**
+   * The host (and port) from where the groom server can be reached.
+   *
+   * @return The groom server address in the format hostname:port
+   */
   public String getHost() {
     return host;
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java Tue Oct 12 01:27:28 2010
@@ -20,6 +20,8 @@ package org.apache.hama.bsp;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -30,12 +32,15 @@ public class HeartbeatResponse implement
   private Configuration conf;
   short responseId;
   private GroomServerAction [] actions; 
+  private Map<String, String> groomServers;
 
   public HeartbeatResponse() {}
   
-  public HeartbeatResponse(short responseId, GroomServerAction [] actions) {
+  public HeartbeatResponse(short responseId, GroomServerAction [] actions,
+      Map<String, String> groomServers) {
     this.responseId = responseId;
     this.actions = actions;
+    this.groomServers = groomServers;
   }
 
   public void setResponseId(short responseId) {
@@ -54,6 +59,14 @@ public class HeartbeatResponse implement
     return actions;
   }
 
+  public void setGroomServers(Map<String, String> groomServers) {
+    this.groomServers = groomServers;
+  }
+
+  public Map<String, String> getGroomServers() {
+    return groomServers;
+  }
+
   @Override
   public void readFields(DataInput in) throws IOException {
     this.responseId = in.readShort();
@@ -69,6 +82,14 @@ public class HeartbeatResponse implement
     } else {
       actions = null;
     }
+
+    String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
+    String[] groomServerAddresses = WritableUtils.readCompressedStringArray(in);
+    groomServers = new HashMap<String, String>(groomServerNames.length);
+
+    for (int i = 0; i < groomServerNames.length; i++) {
+      groomServers.put(groomServerNames[i], groomServerAddresses[i]);
+    }
   }
 
   @Override
@@ -83,6 +104,10 @@ public class HeartbeatResponse implement
         action.write(out);
       }
     }
+    String[] groomServerNames = groomServers.keySet().toArray(new String[0]);
+    WritableUtils.writeCompressedStringArray(out, groomServerNames);
+    String[] groomServerAddresses = groomServers.values().toArray(new String[0]);
+    WritableUtils.writeCompressedStringArray(out, groomServerAddresses);
   }
 
   @Override