You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/09/07 02:54:02 UTC

svn commit: r1165946 - in /incubator/hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/BSPMaster.java core/src/main/java/org/apache/hama/bsp/GroomServer.java

Author: edwardyoon
Date: Wed Sep  7 00:54:02 2011
New Revision: 1165946

URL: http://svn.apache.org/viewvc?rev=1165946&view=rev
Log:
Groom statuses should be reported periodically

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1165946&r1=1165945&r2=1165946&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Sep  7 00:54:02 2011
@@ -9,6 +9,7 @@ Release 0.4 - Unreleased
 
   BUG FIXES
 
+    HAMA-429: Groom statuses should be reported periodically (ChiaHung Lin via edwardyoon)
     HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)
 
   IMPROVEMENTS

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1165946&r1=1165945&r2=1165946&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Wed Sep  7 00:54:02 2011
@@ -343,7 +343,7 @@ public class BSPMaster implements JobSub
       LOG.error("Fail to register GroomServer " + status.getGroomName(), e);
       return false;
     }
-
+    LOG.info(status.getGroomName()+" is added.");
     return true;
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1165946&r1=1165945&r2=1165946&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Wed Sep  7 00:54:02 2011
@@ -271,7 +271,7 @@ public class GroomServer implements Runn
     // Clear out state tables
     this.tasks.clear();
     this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
-    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    this.runningTasks = new ConcurrentHashMap<TaskAttemptID, TaskInProgress>();
     this.finishedTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
     this.conf.set(Constants.PEER_HOST, localHostname);
     this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
@@ -418,6 +418,33 @@ public class GroomServer implements Runn
   public State offerService() throws Exception {
     while (running && !shuttingDown) {
       try {
+
+        // Reports to a BSPMaster
+        for (Map.Entry<TaskAttemptID, TaskInProgress> e : runningTasks
+            .entrySet()) {
+          Thread.sleep(REPORT_INTERVAL);
+          TaskInProgress tip = e.getValue();
+          TaskStatus taskStatus = tip.getStatus();
+
+          if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+            taskStatus.setProgress(taskStatus.getSuperstepCount());
+
+            if (!tip.runner.isAlive()) {
+              if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
+                taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+              }
+              taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
+            }
+          }
+
+          doReport(taskStatus);
+        }
+
+        Thread.sleep(REPORT_INTERVAL);
+      } catch (InterruptedException ie) {
+      }
+
+      try {
         if (justInited) {
           String dir = masterClient.getSystemDir();
           if (dir == null) {
@@ -493,7 +520,6 @@ public class GroomServer implements Runn
 
   public List<TaskStatus> updateTaskStatus(TaskStatus taskStatus) {
     List<TaskStatus> tlist = new ArrayList<TaskStatus>();
-    synchronized (runningTasks) {
 
       if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED
           || taskStatus.getRunState() == TaskStatus.State.FAILED) {
@@ -506,7 +532,6 @@ public class GroomServer implements Runn
         tlist.add((TaskStatus) taskStatus.clone());
       }
 
-    }
     return tlist;
   }
 
@@ -958,20 +983,6 @@ public class GroomServer implements Runn
   @Override
   public void done(TaskAttemptID taskid, boolean shouldBePromoted)
       throws IOException {
-    TaskInProgress tip = runningTasks.get(taskid);
-    TaskStatus taskStatus = tip.getStatus();
-
-    if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
-      taskStatus.setProgress(taskStatus.getSuperstepCount());
-
-      if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
-        taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-        taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
-      }
-    }
-
-    // TODO reduce the reporting times.
-    doReport(taskStatus);
   }
 
   @Override