You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/05/29 02:35:03 UTC

svn commit: r1343442 - in /incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp: BSPJobClient.java JobInProgress.java TaskCompletionEvent.java TaskInProgress.java TaskLog.java TaskLogServlet.java

Author: edwardyoon
Date: Tue May 29 00:35:02 2012
New Revision: 1343442

URL: http://svn.apache.org/viewvc?rev=1343442&view=rev
Log:
Fix minor bugs of HAMA-582

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue May 29 00:35:02 2012
@@ -601,10 +601,10 @@ public class BSPJobClient extends Config
    */
   public boolean monitorAndPrintJob(BSPJob job, RunningJob info)
       throws IOException, InterruptedException {
-
     String lastReport = null;
     LOG.info("Running job: " + info.getID());
-
+    int eventCounter = 0;
+    
     while (!job.isComplete()) {
       Thread.sleep(3000);
       long step = job.progress();
@@ -617,7 +617,6 @@ public class BSPJobClient extends Config
         lastReport = report;
       }
 
-      int eventCounter = 0;
       TaskCompletionEvent[] events = info.getTaskCompletionEvents(eventCounter);
       eventCounter += events.length;
       

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Tue May 29 00:35:02 2012
@@ -105,8 +105,8 @@ class JobInProgress {
 
     this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
 
-    this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
-        .value(), counters);
+    this.status = new JobStatus(jobId, null, 0L, 0L,
+        JobStatus.State.PREP.value(), counters);
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -126,8 +126,8 @@ class JobInProgress {
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
         numBSPTasks + 10);
 
-    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
-        .getJobName());
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
+        job.getJobName());
 
     this.setJobName(job.getJobName());
 
@@ -309,9 +309,9 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), superstepCounter, superstepCounter, superstepCounter,
-          JobStatus.SUCCEEDED, superstepCounter, counters);
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), superstepCounter, superstepCounter,
+          superstepCounter, JobStatus.SUCCEEDED, superstepCounter, counters);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
@@ -345,8 +345,9 @@ class JobInProgress {
       // Kill job
       this.kill();
       // Send KillTaskAction to GroomServer
-      this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), 0L, 0L, 0L, JobStatus.KILLED, superstepCounter, counters);
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), 0L, 0L, 0L, JobStatus.KILLED,
+          superstepCounter, counters);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
@@ -359,43 +360,43 @@ class JobInProgress {
   public synchronized void updateTaskStatus(TaskInProgress tip,
       TaskStatus taskStatus) {
     TaskAttemptID taskid = taskStatus.getTaskId();
+    boolean change = tip.updateStatus(taskStatus); // update tip
 
-    tip.updateStatus(taskStatus); // update tip
+    if (change) {
+      TaskStatus.State state = taskStatus.getRunState();
+      TaskCompletionEvent taskEvent = null;
+      String httpTaskLogLocation = "http://"
+          + tip.getGroomServerStatus().getGroomHostName()
+          + ":"
+          + conf.getInt("bsp.http.groomserver.port",
+              Constants.DEFAULT_GROOM_INFO_SERVER);
+
+      if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) {
+        int eventNumber;
+        if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
+          TaskCompletionEvent t = this.taskCompletionEvents.get(eventNumber);
+          if (t.getTaskAttemptId().equals(taskid))
+            t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
+        }
+
+        // Did the task failure lead to tip failure?
+        TaskCompletionEvent.Status taskCompletionStatus = (state == TaskStatus.State.FAILED) ? TaskCompletionEvent.Status.FAILED
+            : TaskCompletionEvent.Status.KILLED;
+        if (tip.isFailed()) {
+          taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
+        }
+        taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, taskid,
+            tip.idWithinJob(), taskCompletionStatus, httpTaskLogLocation);
 
-    TaskStatus.State state = taskStatus.getRunState();
-    TaskCompletionEvent taskEvent = null;
-    // FIXME port number should be configurable
-    String httpTaskLogLocation = "http://"
-        + tip.getGroomServerStatus().getGroomHostName() + ":"
-        + conf.getInt("bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER);
-
-    if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) {
-      int eventNumber;
-      if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
-        TaskCompletionEvent t = this.taskCompletionEvents.get(eventNumber);
-        if (t.getTaskAttemptId().equals(taskid))
-          t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
-      }
-
-      // Did the task failure lead to tip failure?
-      TaskCompletionEvent.Status taskCompletionStatus = (state == TaskStatus.State.FAILED) ? TaskCompletionEvent.Status.FAILED
-          : TaskCompletionEvent.Status.KILLED;
-      if (tip.isFailed()) {
-        taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
-      }
-      taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, taskid,
-          tip.idWithinJob(), taskCompletionStatus, httpTaskLogLocation);
-
-      if (taskEvent != null) {
-        this.taskCompletionEvents.add(taskEvent);
-        taskCompletionEventTracker++;
+        if (taskEvent != null) {
+          this.taskCompletionEvents.add(taskEvent);
+          taskCompletionEventTracker++;
+        }
       }
     }
 
     if (superstepCounter < taskStatus.getSuperstepCount()) {
       superstepCounter = taskStatus.getSuperstepCount();
-      // TODO Later, we have to update JobInProgress status here
-
     }
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java Tue May 29 00:35:02 2012
@@ -78,17 +78,6 @@ public class TaskCompletionEvent impleme
    * Returns task id.
    * 
    * @return task id
-   * @deprecated use {@link #getTaskAttemptId()} instead.
-   */
-  @Deprecated
-  public String getTaskId() {
-    return taskId.toString();
-  }
-
-  /**
-   * Returns task id.
-   * 
-   * @return task id
    */
   public TaskAttemptID getTaskAttemptId() {
     return taskId;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue May 29 00:35:02 2012
@@ -296,8 +296,19 @@ class TaskInProgress {
     return successfulTaskId;
   }
 
-  public void updateStatus(TaskStatus status) {
-    taskStatuses.put(status.getTaskId(), status);
+  public boolean updateStatus(TaskStatus status) {
+    TaskAttemptID taskid = status.getTaskId();
+    TaskStatus oldStatus = taskStatuses.get(taskid);
+    boolean changed = true;
+    
+    if (oldStatus != null) {
+      TaskStatus.State oldState = oldStatus.getRunState();
+      TaskStatus.State newState = status.getRunState();
+      changed = oldState != newState;
+    }
+    
+    taskStatuses.put(taskid, status);
+    return changed;
   }
 
   public TaskStatus getTaskStatus(TaskAttemptID taskId) {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue May 29 00:35:02 2012
@@ -46,6 +46,7 @@ public class TaskLog {
   }
 
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
+    // TODO clean up the log path and type.
     return new File(LOG_DIR, taskid.getJobID() + "/" + taskid.toString() + ".log");
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java Tue May 29 00:35:02 2012
@@ -29,8 +29,11 @@ import javax.servlet.http.HttpServletRes
 
 import org.apache.hadoop.util.StringUtils;
 
+/**
+ * A servlet that is run by the Grooms to provide the task logs via http.
+ */
 public class TaskLogServlet extends HttpServlet {
-  private static final long serialVersionUID = -6615764817774487321L;
+  private static final long serialVersionUID = -8127091686380253950L;
 
   private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) {
     File f = TaskLog.getTaskLogFile(taskId, type);
@@ -165,8 +168,8 @@ public class TaskLogServlet extends Http
     String logFilter = request.getParameter("filter");
     if (logFilter != null) {
       try {
-        filter = TaskLog.LogName.valueOf(TaskLog.LogName.class, logFilter
-            .toUpperCase());
+        filter = TaskLog.LogName.valueOf(TaskLog.LogName.class,
+            logFilter.toUpperCase());
       } catch (IllegalArgumentException iae) {
         response.sendError(HttpServletResponse.SC_BAD_REQUEST,
             "Illegal value for filter: " + logFilter);