You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/11/09 07:43:09 UTC

svn commit: r1032869 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Tue Nov  9 06:43:08 2010
New Revision: 1032869

URL: http://svn.apache.org/viewvc?rev=1032869&view=rev
Log:
Add superstep counter

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Nov  9 06:43:08 2010
@@ -50,6 +50,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-282: Add superstep counter (Filipe Manana via edwardyoon)
     HAMA-327: Add overview for javadoc (edwardyoon)
     HAMA-325: Add package.html to each package and few comments (edwardyoon)
     HAMA-322: JUnit test threads with GroboTestingJUnit (Filipe Manana via edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Tue Nov  9 06:43:08 2010
@@ -130,6 +130,11 @@ public class BSPJobClient extends Config
       return status.getRunState() == JobStatus.SUCCEEDED;
     }
 
+    public synchronized long getSuperstepCount() throws IOException {
+      ensureFreshStatus();
+      return status.getSuperstepCount();
+    }
+    
     /**
      * Blocks until the job is finished
      */
@@ -385,22 +390,46 @@ public class BSPJobClient extends Config
       IOException {
     BSPJobClient jc = new BSPJobClient(job.getConf());
     RunningJob running = jc.submitJobInternal(job);
-    String jobId = running.getID().toString();
-    LOG.info("Running job: " + jobId);
+    BSPJobID jobId = running.getID();
+    LOG.info("Running job: " + jobId.toString());
 
     while (true) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+
       if (running.isComplete()) {
-        LOG.info("Job complete: " + jobId);
         break;
       }
+
+      running = jc.getJob(jobId);
     }
 
+    LOG.info("Job complete: " + jobId);
+    LOG.info("The total number of supersteps: " + running.getSuperstepCount());
+    
     // TODO if error found, kill job
     // running.killJob();
     jc.close();
   }
 
   /**
+   * Get an RunningJob object to track an ongoing job. Returns null if the id
+   * does not correspond to any known job.
+   * 
+   * @throws IOException 
+   */
+  private RunningJob getJob(BSPJobID jobId) throws IOException {
+    JobStatus status = jobSubmitClient.getJobStatus(jobId);
+    if (status != null) {
+      return new NetworkedJob(status);
+    } else {
+      return null;
+    }
+  }
+
+  /**
    * Get status information about the BSP cluster
    * 
    * @param detailed if true then get a detailed status including the

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Nov  9 06:43:08 2010
@@ -61,6 +61,7 @@ public class BSPPeer implements Watcher,
   protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
   protected Set<String> allPeerNames = new HashSet<String>();
   protected InetSocketAddress peerAddress;
+  protected TaskStatus currentTaskStatus;
 
   /**
    * 
@@ -68,8 +69,10 @@ public class BSPPeer implements Watcher,
   public BSPPeer(Configuration conf) throws IOException {
     this.conf = conf;
 
-    String bindAddress = conf.get(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
-    int bindPort = conf.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+    String bindAddress = conf.get(Constants.PEER_HOST,
+        Constants.DEFAULT_PEER_HOST);
+    int bindPort = conf
+        .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
     bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
         Constants.DEFAULT_ZOOKEEPER_ROOT);
     zookeeperAddr = conf.get(Constants.ZOOKEEPER_QUORUM)
@@ -84,7 +87,8 @@ public class BSPPeer implements Watcher,
   public void reinitialize() {
     try {
       LOG.debug("reinitialize(): " + getPeerName());
-      server = RPC.getServer(this, peerAddress.getHostName(), peerAddress.getPort(), conf);
+      server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
+          .getPort(), conf);
       server.start();
     } catch (IOException e) {
       e.printStackTrace();
@@ -129,8 +133,7 @@ public class BSPPeer implements Watcher,
    * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
    */
   @Override
-  public void send(String peerName, BSPMessage msg)
-      throws IOException {
+  public void send(String peerName, BSPMessage msg) throws IOException {
     LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
     ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(peerName);
     if (queue == null) {
@@ -170,7 +173,7 @@ public class BSPPeer implements Watcher,
       }
     }
 
-    // Should we clearing outgoingQueues?
+    // Clear outgoing queues.
     this.outgoingQueues.clear();
 
     enterBarrier();
@@ -180,13 +183,14 @@ public class BSPPeer implements Watcher,
     // the number of peers, and the load of zookeeper.
     // It should fixed to some flawless way.
     leaveBarrier();
+    currentTaskStatus.incrementSuperstepCount();
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
     LOG.debug("[" + getPeerName() + "] enter the enterbarrier");
     try {
-      zk.create(bspRoot + "/" + getPeerName(), new byte[0], Ids.OPEN_ACL_UNSAFE,
-          CreateMode.EPHEMERAL);
+      zk.create(bspRoot + "/" + getPeerName(), new byte[0],
+          Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
     } catch (KeeperException e) {
       e.printStackTrace();
     } catch (InterruptedException e) {
@@ -265,7 +269,7 @@ public class BSPPeer implements Watcher,
   }
 
   /**
-   * @return the string as host:port of this Peer 
+   * @return the string as host:port of this Peer
    */
   public String getPeerName() {
     return peerAddress.getHostName() + ":" + peerAddress.getPort();
@@ -292,9 +296,26 @@ public class BSPPeer implements Watcher,
     this.allPeerNames = new HashSet<String>(allPeerNames);
   }
 
-  @Override
+  /**
+   * @return the number of messages
+   */
   public int getNumCurrentMessages() {
     return localQueue.size();
   }
 
+  /**
+   * Sets the current status
+   * 
+   * @param currentTaskStatus
+   */
+  public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+    this.currentTaskStatus = currentTaskStatus;
+  }
+
+  /**
+   * @return the count of current super-step
+   */
+  public long getSuperstepCount() {
+    return currentTaskStatus.getSuperstepCount();
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Tue Nov  9 06:43:08 2010
@@ -60,6 +60,11 @@ public interface BSPPeerInterface extend
   public void sync() throws IOException, KeeperException, InterruptedException;
 
   /**
+   * @return the count of current super-step
+   */
+  public long getSuperstepCount();
+  
+  /**
    * @return The name of this peer in the format "hostname:port".
    */
   public String getPeerName();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Nov  9 06:43:08 2010
@@ -534,6 +534,7 @@ public class GroomServer implements Runn
 
     public void launchTask() throws IOException {
       taskStatus.setRunState(TaskStatus.State.RUNNING);
+      bspPeer.setCurrentTaskStatus(taskStatus);
       this.runner = task.createRunner(bspPeer, this.jobConf);
       this.runner.start();
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Tue Nov  9 06:43:08 2010
@@ -28,11 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 
-/*************************************************************
+/**
  * JobInProgress maintains all the info for keeping a Job on the straight and
  * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
- * tables for doing bookkeeping of its Tasks.
- * ***********************************************************
+ * tables for doing bookkeeping of its Tasks.ss
  */
 class JobInProgress {
   /**
@@ -64,6 +63,7 @@ class JobInProgress {
   private BSPJobID jobId;
   final BSPMaster master;
   List<TaskInProgress> tasks;
+  private long superstepCounter;
 
   public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
       throws IOException {
@@ -77,6 +77,7 @@ class JobInProgress {
     this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
     status.setStartTime(startTime);
+    this.superstepCounter = 0;
 
     this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
         + ".xml");
@@ -96,7 +97,7 @@ class JobInProgress {
     if (jarFile != null) {
       fs.copyToLocalFile(new Path(jarFile), localJarFile);
     }
-    
+
   }
 
   // ///////////////////////////////////////////////////
@@ -141,7 +142,7 @@ class JobInProgress {
   public synchronized Task obtainNewTask(GroomServerStatus status,
       int clusterSize, int numUniqueHosts) {
     LOG.debug("clusterSize: " + clusterSize);
-    
+
     Task result = null;
     try {
       TaskInProgress tip = new TaskInProgress(getJobID(), this.jobFile
@@ -175,18 +176,24 @@ class JobInProgress {
 
     this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
         JobStatus.RUNNING);
-    
-    if(allDone) {
+
+    if (allDone) {
       LOG.debug("Job successfully done.");
-      
-      this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
-          JobStatus.SUCCEEDED);
-      garbageCollect();        
+
+      this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f, 1.0f,
+          JobStatus.SUCCEEDED, superstepCounter);
+      garbageCollect();
     }
   }
 
-  public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
-    tip.updateStatus(status); // update tip
+  public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus taskStatus) {
+    tip.updateStatus(taskStatus); // update tip
+
+    if (superstepCounter < taskStatus.getSuperstepCount()) {
+      superstepCounter = taskStatus.getSuperstepCount();
+      // TODO Later, we have to update JobInProgress status here
+      
+    }
   }
 
   public void kill() {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Tue Nov  9 06:43:08 2010
@@ -51,7 +51,8 @@ public class JobStatus implements Writab
   private long startTime;
   private String schedulingInfo = "NA";
   private String user;
-
+  private long superstepCount;
+  
   public JobStatus() {
   }
 
@@ -66,13 +67,19 @@ public class JobStatus implements Writab
 
   public JobStatus(BSPJobID jobid, float setupProgress, float progress,
       float cleanupProgress, int runState) {
+    this(jobid, 0.0f, progress, cleanupProgress, runState, 0);
+  }
+
+  public JobStatus(BSPJobID jobid, float setupProgress, float progress,
+      float cleanupProgress, int runState, long superstepCount) {
     this.jobid = jobid;
     this.setupProgress = setupProgress;
     this.progress = progress;
     this.cleanupProgress = cleanupProgress;
     this.runState = runState;
+    this.superstepCount = superstepCount;
   }
-
+  
   public BSPJobID getJobID() {
     return jobid;
   }
@@ -109,6 +116,10 @@ public class JobStatus implements Writab
     this.runState = state;
   }
 
+  public synchronized long getSuperstepCount() {
+    return superstepCount;
+  }
+  
   synchronized void setStartTime(long startTime) {
     this.startTime = startTime;
   }
@@ -160,6 +171,7 @@ public class JobStatus implements Writab
     out.writeInt(runState);
     out.writeLong(startTime);
     Text.writeString(out, schedulingInfo);
+    out.writeLong(superstepCount);
   }
 
   public synchronized void readFields(DataInput in) throws IOException {
@@ -171,5 +183,7 @@ public class JobStatus implements Writab
     this.runState = in.readInt();
     this.startTime = in.readLong();
     this.schedulingInfo = Text.readString(in);
+    this.superstepCount = in.readLong();
   }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java Tue Nov  9 06:43:08 2010
@@ -111,4 +111,6 @@ public interface RunningJob {
    */
   public void killTask(TaskAttemptID taskId, boolean shouldFail)
       throws IOException;
+
+  public long getSuperstepCount() throws IOException;
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Tue Nov  9 06:43:08 2010
@@ -194,4 +194,8 @@ class TaskInProgress {
   public void updateStatus(TaskStatus status) {
     taskStatuses.put(status.getTaskId(), status);
   }
+  
+  public TaskStatus getTaskStatus(String taskId) {
+    return this.taskStatuses.get(taskId);
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Tue Nov  9 06:43:08 2010
@@ -45,7 +45,8 @@ class TaskStatus implements Writable, Cl
   private volatile State runState;
   private String stateString;
   private String groomServer;
-
+  private long superstepCount;
+  
   private long startTime;
   private long finishTime;
 
@@ -56,6 +57,7 @@ class TaskStatus implements Writable, Cl
    */
   public TaskStatus() {
     taskId = new String();
+    this.superstepCount = 0;
   }
 
   public TaskStatus(String taskId, float progress, State runState,
@@ -66,6 +68,7 @@ class TaskStatus implements Writable, Cl
     this.stateString = stateString;
     this.groomServer = groomServer;
     this.phase = phase;
+    this.superstepCount = 0;
   }
 
   // //////////////////////////////////////////////////
@@ -208,6 +211,20 @@ class TaskStatus implements Writable, Cl
       this.finishTime = finishTime;
     }
   }
+  
+  /**
+   * @return The number of BSP super steps executed by the task.
+   */
+  public long getSuperstepCount() {
+    return superstepCount;
+  }
+  
+  /**
+   * Increments the number of BSP super steps executed by the task.
+   */
+  public void incrementSuperstepCount() {
+    superstepCount += 1;
+  }
 
   @Override
   public Object clone() {
@@ -232,6 +249,7 @@ class TaskStatus implements Writable, Cl
     this.phase = WritableUtils.readEnum(in, Phase.class);
     this.startTime = in.readLong();
     this.finishTime = in.readLong();
+    this.superstepCount = in.readLong();
   }
 
   @Override
@@ -243,5 +261,6 @@ class TaskStatus implements Writable, Cl
     WritableUtils.writeEnum(out, phase);
     out.writeLong(startTime);
     out.writeLong(finishTime);
+    out.writeLong(superstepCount);
   }
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1032869&r1=1032868&r2=1032869&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Tue Nov  9 06:43:08 2010
@@ -27,7 +27,6 @@ import java.util.Random;
 import java.util.Set;
 
 import junit.framework.AssertionFailedError;
-
 import net.sourceforge.groboutils.junit.v1.MultiThreadedTestRunner;
 import net.sourceforge.groboutils.junit.v1.TestRunnable;
 
@@ -97,6 +96,9 @@ public class TestBSPPeer extends HamaClu
         peerNames.add("localhost:" + (30000 + i));
       }
       peer.setAllPeerNames(peerNames);
+      TaskStatus currentTaskStatus = new TaskStatus("localhost:"
+          + lastTwoDigitsOfPort, 0, null, null, null, null);
+      peer.setCurrentTaskStatus(currentTaskStatus);
     }
 
     @Override
@@ -135,14 +137,16 @@ public class TestBSPPeer extends HamaClu
           e.printStackTrace();
         }
 
-        verifyPayload();
+        verifyPayload(i);
       }
     }
 
-    private void verifyPayload() {
+    private void verifyPayload(int round) {
       int numMessages = peer.getNumCurrentMessages();
+      assertEquals(round, ((int) peer.getSuperstepCount() -1 ));
+      
       LOG.info("[" + peer.getPeerName() + "] verifying " + numMessages
-          + " messages");
+          + " messages at " + round + " round");
 
       if (lastTwoDigitsOfPort < 10) {
         assertEquals(20, numMessages);
@@ -169,7 +173,7 @@ public class TestBSPPeer extends HamaClu
     }
   }
 
-  public void testSync() throws Throwable  {
+  public void testSync() throws Throwable {
 
     conf.setInt("bsp.peers.num", NUM_PEER);
     conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");