You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/11/10 02:29:33 UTC

svn commit: r1033317 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Wed Nov 10 01:29:33 2010
New Revision: 1033317

URL: http://svn.apache.org/viewvc?rev=1033317&view=rev
Log:
Allows user to set the number of tasks

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Nov 10 01:29:33 2010
@@ -50,6 +50,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-321: Allows user to set the number of tasks (edwardyoon) 
     HAMA-282: Add superstep counter (Filipe Manana via edwardyoon)
     HAMA-327: Add overview for javadoc (edwardyoon)
     HAMA-325: Add package.html to each package and few comments (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Nov 10 01:29:33 2010
@@ -94,14 +94,13 @@ public class PiEstimator {
       IOException {
     // BSP job configuration
     HamaConfiguration conf = new HamaConfiguration();
-    // Execute locally
-    // conf.set("bsp.master.address", "local");
 
     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
     // Set the job name
     bsp.setJobName("pi estimation example");
     bsp.setBspClass(MyEstimator.class);
-
+    bsp.setNumBspTask(10);
+    
     BSPJobClient jobClient = new BSPJobClient(conf);
     ClusterStatus cluster = jobClient.getClusterStatus(true);
     // Choose one as a master

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/SerializePrinting.java Wed Nov 10 01:29:33 2010
@@ -70,14 +70,13 @@ public class SerializePrinting {
       IOException {
     // BSP job configuration
     HamaConfiguration conf = new HamaConfiguration();
-    // Execute locally
-    // conf.set("bsp.master.address", "local");
 
     BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
     // Set the job name
     bsp.setJobName("serialize printing");
     bsp.setBspClass(HelloBSP.class);
-
+    bsp.setNumBspTask(10);
+    
     BSPJobClient.runJob(bsp);
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Wed Nov 10 01:29:33 2010
@@ -38,20 +38,34 @@ public class BSPJob extends BSPJobContex
     this(new HamaConfiguration());
   }
 
-  public BSPJob(HamaConfiguration conf, String jobName) throws IOException {
-    this(conf);
-    setJobName(jobName);
-  }
-
   public BSPJob(HamaConfiguration conf) throws IOException {
     super(conf, null);
     jobClient = new BSPJobClient(conf);
   }
+  
+  public BSPJob(HamaConfiguration conf, String jobName) throws IOException {
+    this(conf);
+    setJobName(jobName);
+  }
 
   public BSPJob(BSPJobID jobID, String jobFile) throws IOException {
      super(new Path(jobFile), jobID);
   }
 
+  /**
+   * Only for unit test
+   * 
+   * TODO will be deleted when miniBSPCluster implemented
+   * 
+   * @param conf
+   * @param tasks
+   * @throws IOException
+   */
+  public BSPJob(HamaConfiguration conf, int tasks) throws IOException {
+    super(conf, null);
+    setNumBspTask(tasks);
+  }
+  
   @SuppressWarnings("unchecked")
   public BSPJob(HamaConfiguration conf, Class exampleClass) throws IOException {
     this(conf);
@@ -200,4 +214,7 @@ public class BSPJob extends BSPJobContex
     conf.setInt("bsp.peers.num", tasks);
   }
 
+  public int getNumBspTask() {
+    return conf.getInt("bsp.peers.num", 1);
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Wed Nov 10 01:29:33 2010
@@ -134,7 +134,7 @@ public class BSPJobClient extends Config
       ensureFreshStatus();
       return status.getSuperstepCount();
     }
-    
+
     /**
      * Blocks until the job is finished
      */
@@ -389,6 +389,15 @@ public class BSPJobClient extends Config
   public static void runJob(BSPJob job) throws FileNotFoundException,
       IOException {
     BSPJobClient jc = new BSPJobClient(job.getConf());
+
+    // TODO this code must be removed 
+    // when GroomServer supports the multiple tasks.
+    if (job.getNumBspTask() > jc.getClusterStatus(false).getGroomServers()) {
+      // If the number of tasks is greater than the number of GroomServer,
+      // reset the number of tasks as number of GroomServer.
+      job.setNumBspTask(jc.getClusterStatus(false).getGroomServers());
+    }
+
     RunningJob running = jc.submitJobInternal(job);
     BSPJobID jobId = running.getID();
     LOG.info("Running job: " + jobId.toString());
@@ -408,7 +417,7 @@ public class BSPJobClient extends Config
 
     LOG.info("Job complete: " + jobId);
     LOG.info("The total number of supersteps: " + running.getSuperstepCount());
-    
+
     // TODO if error found, kill job
     // running.killJob();
     jc.close();
@@ -418,7 +427,7 @@ public class BSPJobClient extends Config
    * Get an RunningJob object to track an ongoing job. Returns null if the id
    * does not correspond to any known job.
    * 
-   * @throws IOException 
+   * @throws IOException
    */
   private RunningJob getJob(BSPJobID jobId) throws IOException {
     JobStatus status = jobSubmitClient.getJobStatus(jobId);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Wed Nov 10 01:29:33 2010
@@ -46,25 +46,25 @@ import org.apache.zookeeper.data.Stat;
 public class BSPPeer implements Watcher, BSPPeerInterface {
   public static final Log LOG = LogFactory.getLog(BSPPeer.class);
 
-  protected Configuration conf;
+  private Configuration conf;
+  private BSPJob jobConf;
 
-  protected InetSocketAddress masterAddr = null;
-  protected Server server = null;
-  protected ZooKeeper zk = null;
-  protected volatile Integer mutex = 0;
-
-  protected final String bspRoot;
-  protected final String zookeeperAddr;
-
-  protected final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
-  protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
-  protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
-  protected Set<String> allPeerNames = new HashSet<String>();
-  protected InetSocketAddress peerAddress;
-  protected TaskStatus currentTaskStatus;
+  private Server server = null;
+  private ZooKeeper zk = null;
+  private volatile Integer mutex = 0;
+
+  private final String bspRoot;
+  private final String zookeeperAddr;
+
+  private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+  private final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+  private Set<String> allPeerNames = new HashSet<String>();
+  private InetSocketAddress peerAddress;
+  private TaskStatus currentTaskStatus;
 
   /**
-   * 
+   * Constructor
    */
   public BSPPeer(Configuration conf) throws IOException {
     this.conf = conf;
@@ -174,7 +174,7 @@ public class BSPPeer implements Watcher,
     }
 
     // Clear outgoing queues.
-    this.outgoingQueues.clear();
+    clearOutgoingQueues();
 
     enterBarrier();
     Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work
@@ -201,8 +201,7 @@ public class BSPPeer implements Watcher,
       synchronized (mutex) {
         List<String> list = zk.getChildren(bspRoot, true);
 
-        // TODO it must be same with the number of slave nodes, at this time.
-        if (list.size() < conf.getInt("bsp.peers.num", 0)) {
+        if (list.size() < jobConf.getNumBspTask()) {
           mutex.wait();
         } else {
           return true;
@@ -318,4 +317,41 @@ public class BSPPeer implements Watcher,
   public long getSuperstepCount() {
     return currentTaskStatus.getSuperstepCount();
   }
+
+  /**
+   * Sets the job configuration
+   * 
+   * @param jobConf
+   */
+  public void setJobConf(BSPJob jobConf) {
+    this.jobConf = jobConf;
+  }
+
+  /**
+   * @return the size of local queue
+   */
+  public int getLocalQueueSize() {
+    return localQueue.size();
+  }
+
+  /**
+   * @return the size of outgoing queue
+   */
+  public int getOutgoingQueueSize() {
+    return outgoingQueues.size();
+  }
+
+  /**
+   * Clears local queue
+   */
+  public void clearLocalQueue() {
+    this.localQueue.clear();
+  }
+  
+  /**
+   * Clears outgoing queues
+   */
+  public void clearOutgoingQueues() {
+    this.outgoingQueues.clear();
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Nov 10 01:29:33 2010
@@ -534,6 +534,7 @@ public class GroomServer implements Runn
 
     public void launchTask() throws IOException {
       taskStatus.setRunState(TaskStatus.State.RUNNING);
+      bspPeer.setJobConf(jobConf);
       bspPeer.setCurrentTaskStatus(taskStatus);
       this.runner = task.createRunner(bspPeer, this.jobConf);
       this.runner.start();
@@ -546,8 +547,8 @@ public class GroomServer implements Runn
           e.printStackTrace();
         }
 
-        if (bspPeer.localQueue.size() == 0
-            && bspPeer.outgoingQueues.size() == 0
+        if (bspPeer.getLocalQueueSize() == 0
+            && bspPeer.getOutgoingQueueSize() == 0
             && !runner.isAlive()) {
           taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
           acceptNewTasks = true;

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1033317&r1=1033316&r2=1033317&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Wed Nov 10 01:29:33 2010
@@ -32,9 +32,9 @@ import net.sourceforge.groboutils.junit.
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.util.Bytes;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -51,7 +51,7 @@ public class TestBSPPeer extends HamaClu
   private static final int ROUND = 3;
   private static final int PAYLOAD = 1024; // 1kb in default
   List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
-  Configuration conf;
+  HamaConfiguration conf;
   private Random r = new Random();
 
   public TestBSPPeer() {
@@ -88,7 +88,7 @@ public class TestBSPPeer extends HamaClu
     private int MAXIMUM_DURATION = 5;
     private int lastTwoDigitsOfPort;
 
-    public BSPPeerThread(Configuration conf) throws IOException {
+    public BSPPeerThread(HamaConfiguration conf) throws IOException {
       lastTwoDigitsOfPort = conf.getInt(Constants.PEER_PORT, 0) - 30000;
       this.peer = new BSPPeer(conf);
       Set<String> peerNames = new HashSet<String>(NUM_PEER);
@@ -99,6 +99,8 @@ public class TestBSPPeer extends HamaClu
       TaskStatus currentTaskStatus = new TaskStatus("localhost:"
           + lastTwoDigitsOfPort, 0, null, null, null, null);
       peer.setCurrentTaskStatus(currentTaskStatus);
+      BSPJob jobConf = new BSPJob(conf, NUM_PEER);
+      peer.setJobConf((BSPJob) jobConf);
     }
 
     @Override
@@ -165,7 +167,7 @@ public class TestBSPPeer extends HamaClu
         LOG.error(e);
       }
 
-      peer.localQueue.clear();
+      peer.clearLocalQueue();
     }
 
     public BSPPeer getBSPPeer() {
@@ -174,12 +176,12 @@ public class TestBSPPeer extends HamaClu
   }
 
   public void testSync() throws Throwable {
-
+    
     conf.setInt("bsp.peers.num", NUM_PEER);
     conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     conf.set(Constants.PEER_HOST, "localhost");
     conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
-
+    
     TestRunnable[] threads = new TestRunnable[NUM_PEER];
 
     for (int i = 0; i < NUM_PEER; i++) {