You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/05/29 02:35:03 UTC
svn commit: r1343442 - in
/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp:
BSPJobClient.java JobInProgress.java TaskCompletionEvent.java
TaskInProgress.java TaskLog.java TaskLogServlet.java
Author: edwardyoon
Date: Tue May 29 00:35:02 2012
New Revision: 1343442
URL: http://svn.apache.org/viewvc?rev=1343442&view=rev
Log:
Fix minor bugs of HAMA-582
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue May 29 00:35:02 2012
@@ -601,10 +601,10 @@ public class BSPJobClient extends Config
*/
public boolean monitorAndPrintJob(BSPJob job, RunningJob info)
throws IOException, InterruptedException {
-
String lastReport = null;
LOG.info("Running job: " + info.getID());
-
+ int eventCounter = 0;
+
while (!job.isComplete()) {
Thread.sleep(3000);
long step = job.progress();
@@ -617,7 +617,6 @@ public class BSPJobClient extends Config
lastReport = report;
}
- int eventCounter = 0;
TaskCompletionEvent[] events = info.getTaskCompletionEvents(eventCounter);
eventCounter += events.length;
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Tue May 29 00:35:02 2012
@@ -105,8 +105,8 @@ class JobInProgress {
this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
- this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
- .value(), counters);
+ this.status = new JobStatus(jobId, null, 0L, 0L,
+ JobStatus.State.PREP.value(), counters);
this.startTime = System.currentTimeMillis();
this.superstepCounter = 0;
this.restartCount = 0;
@@ -126,8 +126,8 @@ class JobInProgress {
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
numBSPTasks + 10);
- this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
- .getJobName());
+ this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
+ job.getJobName());
this.setJobName(job.getJobName());
@@ -309,9 +309,9 @@ class JobInProgress {
}
if (allDone) {
- this.status = new JobStatus(this.status.getJobID(), this.profile
- .getUser(), superstepCounter, superstepCounter, superstepCounter,
- JobStatus.SUCCEEDED, superstepCounter, counters);
+ this.status = new JobStatus(this.status.getJobID(),
+ this.profile.getUser(), superstepCounter, superstepCounter,
+ superstepCounter, JobStatus.SUCCEEDED, superstepCounter, counters);
this.finishTime = System.currentTimeMillis();
this.status.setFinishTime(this.finishTime);
@@ -345,8 +345,9 @@ class JobInProgress {
// Kill job
this.kill();
// Send KillTaskAction to GroomServer
- this.status = new JobStatus(this.status.getJobID(), this.profile
- .getUser(), 0L, 0L, 0L, JobStatus.KILLED, superstepCounter, counters);
+ this.status = new JobStatus(this.status.getJobID(),
+ this.profile.getUser(), 0L, 0L, 0L, JobStatus.KILLED,
+ superstepCounter, counters);
this.finishTime = System.currentTimeMillis();
this.status.setFinishTime(this.finishTime);
@@ -359,43 +360,43 @@ class JobInProgress {
public synchronized void updateTaskStatus(TaskInProgress tip,
TaskStatus taskStatus) {
TaskAttemptID taskid = taskStatus.getTaskId();
+ boolean change = tip.updateStatus(taskStatus); // update tip
- tip.updateStatus(taskStatus); // update tip
+ if (change) {
+ TaskStatus.State state = taskStatus.getRunState();
+ TaskCompletionEvent taskEvent = null;
+ String httpTaskLogLocation = "http://"
+ + tip.getGroomServerStatus().getGroomHostName()
+ + ":"
+ + conf.getInt("bsp.http.groomserver.port",
+ Constants.DEFAULT_GROOM_INFO_SERVER);
+
+ if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) {
+ int eventNumber;
+ if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
+ TaskCompletionEvent t = this.taskCompletionEvents.get(eventNumber);
+ if (t.getTaskAttemptId().equals(taskid))
+ t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
+ }
+
+ // Did the task failure lead to tip failure?
+ TaskCompletionEvent.Status taskCompletionStatus = (state == TaskStatus.State.FAILED) ? TaskCompletionEvent.Status.FAILED
+ : TaskCompletionEvent.Status.KILLED;
+ if (tip.isFailed()) {
+ taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
+ }
+ taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, taskid,
+ tip.idWithinJob(), taskCompletionStatus, httpTaskLogLocation);
- TaskStatus.State state = taskStatus.getRunState();
- TaskCompletionEvent taskEvent = null;
- // FIXME port number should be configurable
- String httpTaskLogLocation = "http://"
- + tip.getGroomServerStatus().getGroomHostName() + ":"
- + conf.getInt("bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER);
-
- if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) {
- int eventNumber;
- if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
- TaskCompletionEvent t = this.taskCompletionEvents.get(eventNumber);
- if (t.getTaskAttemptId().equals(taskid))
- t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
- }
-
- // Did the task failure lead to tip failure?
- TaskCompletionEvent.Status taskCompletionStatus = (state == TaskStatus.State.FAILED) ? TaskCompletionEvent.Status.FAILED
- : TaskCompletionEvent.Status.KILLED;
- if (tip.isFailed()) {
- taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
- }
- taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, taskid,
- tip.idWithinJob(), taskCompletionStatus, httpTaskLogLocation);
-
- if (taskEvent != null) {
- this.taskCompletionEvents.add(taskEvent);
- taskCompletionEventTracker++;
+ if (taskEvent != null) {
+ this.taskCompletionEvents.add(taskEvent);
+ taskCompletionEventTracker++;
+ }
}
}
if (superstepCounter < taskStatus.getSuperstepCount()) {
superstepCounter = taskStatus.getSuperstepCount();
- // TODO Later, we have to update JobInProgress status here
-
}
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java Tue May 29 00:35:02 2012
@@ -78,17 +78,6 @@ public class TaskCompletionEvent impleme
* Returns task id.
*
* @return task id
- * @deprecated use {@link #getTaskAttemptId()} instead.
- */
- @Deprecated
- public String getTaskId() {
- return taskId.toString();
- }
-
- /**
- * Returns task id.
- *
- * @return task id
*/
public TaskAttemptID getTaskAttemptId() {
return taskId;
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue May 29 00:35:02 2012
@@ -296,8 +296,19 @@ class TaskInProgress {
return successfulTaskId;
}
- public void updateStatus(TaskStatus status) {
- taskStatuses.put(status.getTaskId(), status);
+ public boolean updateStatus(TaskStatus status) {
+ TaskAttemptID taskid = status.getTaskId();
+ TaskStatus oldStatus = taskStatuses.get(taskid);
+ boolean changed = true;
+
+ if (oldStatus != null) {
+ TaskStatus.State oldState = oldStatus.getRunState();
+ TaskStatus.State newState = status.getRunState();
+ changed = oldState != newState;
+ }
+
+ taskStatuses.put(taskid, status);
+ return changed;
}
public TaskStatus getTaskStatus(TaskAttemptID taskId) {
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue May 29 00:35:02 2012
@@ -46,6 +46,7 @@ public class TaskLog {
}
public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
+ // TODO clean up the log path and type.
return new File(LOG_DIR, taskid.getJobID() + "/" + taskid.toString() + ".log");
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java?rev=1343442&r1=1343441&r2=1343442&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java Tue May 29 00:35:02 2012
@@ -29,8 +29,11 @@ import javax.servlet.http.HttpServletRes
import org.apache.hadoop.util.StringUtils;
+/**
+ * A servlet that is run by the Grooms to provide the task logs via http.
+ */
public class TaskLogServlet extends HttpServlet {
- private static final long serialVersionUID = -6615764817774487321L;
+ private static final long serialVersionUID = -8127091686380253950L;
private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) {
File f = TaskLog.getTaskLogFile(taskId, type);
@@ -165,8 +168,8 @@ public class TaskLogServlet extends Http
String logFilter = request.getParameter("filter");
if (logFilter != null) {
try {
- filter = TaskLog.LogName.valueOf(TaskLog.LogName.class, logFilter
- .toUpperCase());
+ filter = TaskLog.LogName.valueOf(TaskLog.LogName.class,
+ logFilter.toUpperCase());
} catch (IllegalArgumentException iae) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Illegal value for filter: " + logFilter);