You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/05 12:50:06 UTC
svn commit: r692408 [2/2] - in /hadoop/core/trunk: ./
src/docs/src/documentation/content/xdocs/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/webapps/job/
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Sep 5 03:50:04 2008
@@ -21,7 +21,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.net.URI;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Iterator;
@@ -34,8 +33,6 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -48,7 +45,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
@@ -110,8 +106,8 @@
private String jobFile; // job configuration file
private TaskAttemptID taskId; // unique, includes job id
private int partition; // id within job
- TaskStatus taskStatus; // current status of the task
- private Path taskOutputPath; // task-specific output dir
+ TaskStatus taskStatus; // current status of the task
+ protected boolean cleanupJob = false;
//failed ranges from previous attempts
private SortedRanges failedRanges = new SortedRanges();
@@ -125,6 +121,10 @@
protected JobConf conf;
protected MapOutputFile mapOutputFile = new MapOutputFile();
protected LocalDirAllocator lDirAlloc;
+ private final static int MAX_RETRIES = 10;
+ protected JobContext jobContext;
+ protected TaskAttemptContext taskContext;
+ private volatile boolean commitPending = false;
////////////////////////////////////////////
// Constructors
@@ -220,6 +220,13 @@
this.skipping = skipping;
}
+ /**
+ * Sets whether the task is cleanup task
+ */
+ public void setCleanupTask() {
+ cleanupJob = true;
+ }
+
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@@ -228,48 +235,27 @@
Text.writeString(out, jobFile);
taskId.write(out);
out.writeInt(partition);
- if (taskOutputPath != null) {
- Text.writeString(out, taskOutputPath.toString());
- } else {
- Text.writeString(out, "");
- }
taskStatus.write(out);
failedRanges.write(out);
out.writeBoolean(skipping);
+ out.writeBoolean(cleanupJob);
}
public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in);
taskId = TaskAttemptID.read(in);
partition = in.readInt();
- String outPath = Text.readString(in);
- if (outPath.length() != 0) {
- taskOutputPath = new Path(outPath);
- } else {
- taskOutputPath = null;
- }
taskStatus.readFields(in);
this.mapOutputFile.setJobId(taskId.getJobID());
failedRanges.readFields(in);
currentRecIndexIterator = failedRanges.skipRangeIterator();
currentRecStartIndex = currentRecIndexIterator.next();
skipping = in.readBoolean();
+ cleanupJob = in.readBoolean();
}
@Override
public String toString() { return taskId.toString(); }
- private Path getTaskOutputPath(JobConf conf) {
- Path p = new Path(FileOutputFormat.getOutputPath(conf),
- (MRConstants.TEMP_DIR_NAME + Path.SEPARATOR + "_" + taskId));
- try {
- FileSystem fs = p.getFileSystem(conf);
- return p.makeQualified(fs);
- } catch (IOException ie) {
- LOG.warn(StringUtils.stringifyException(ie));
- return p;
- }
- }
-
/**
* Localize the given JobConf to be specific for this task.
*/
@@ -279,12 +265,18 @@
conf.setBoolean("mapred.task.is.map", isMapTask());
conf.setInt("mapred.task.partition", partition);
conf.set("mapred.job.id", taskId.getJobID().toString());
-
- // The task-specific output path
- if (FileOutputFormat.getOutputPath(conf) != null) {
- taskOutputPath = getTaskOutputPath(conf);
- FileOutputFormat.setWorkOutputPath(conf, taskOutputPath);
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ if (outputPath != null) {
+ OutputCommitter committer = conf.getOutputCommitter();
+ if ((committer instanceof FileOutputCommitter)) {
+ TaskAttemptContext context = new TaskAttemptContext(conf, taskId);
+ FileOutputFormat.setWorkOutputPath(conf,
+ ((FileOutputCommitter)committer).getTempTaskOutputPath(context));
+ } else {
+ FileOutputFormat.setWorkOutputPath(conf, outputPath);
+ }
}
+
}
/** Run this task as a part of the named job. This method is executed in the
@@ -359,8 +351,17 @@
if (sendProgress) {
// we need to send progress update
updateCounters();
- taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(),
- counters);
+ if (commitPending) {
+ taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
+ taskProgress.get(),
+ taskProgress.toString(),
+ counters);
+ } else {
+ taskStatus.statusUpdate(TaskStatus.State.RUNNING,
+ taskProgress.get(),
+ taskProgress.toString(),
+ counters);
+ }
taskFound = umbilical.statusUpdate(taskId, taskStatus);
taskStatus.clearStatus();
}
@@ -396,6 +397,13 @@
LOG.debug(getTaskID() + " Progress/ping thread started");
}
+ public void initialize(JobConf job, Reporter reporter)
+ throws IOException {
+ jobContext = new JobContext(job, reporter);
+ taskContext = new TaskAttemptContext(job, taskId, reporter);
+ OutputCommitter committer = conf.getOutputCommitter();
+ committer.setupTask(taskContext);
+ }
protected Reporter getReporter(final TaskUmbilicalProtocol umbilical)
throws IOException
@@ -543,54 +551,86 @@
}
public void done(TaskUmbilicalProtocol umbilical) throws IOException {
- int retries = 10;
- boolean needProgress = true;
+ LOG.info("Task:" + taskId + " is done."
+ + " And is in the process of commiting");
updateCounters();
+
+ OutputCommitter outputCommitter = conf.getOutputCommitter();
+ // check whether the commit is required.
+ boolean commitRequired = outputCommitter.needsTaskCommit(taskContext);
+ if (commitRequired) {
+ int retries = MAX_RETRIES;
+ taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
+ commitPending = true;
+ // say the task tracker that task is commit pending
+ while (true) {
+ try {
+ umbilical.commitPending(taskId, taskStatus);
+ break;
+ } catch (InterruptedException ie) {
+ // ignore
+ } catch (IOException ie) {
+ LOG.warn("Failure sending commit pending: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ System.exit(67);
+ }
+ }
+ }
+ //wait for commit approval and commit
+ commit(umbilical, outputCommitter);
+ }
taskDone.set(true);
+ sendLastUpdate(umbilical);
+ //signal the tasktracker that we are done
+ sendDone(umbilical);
+ }
+
+ private void sendLastUpdate(TaskUmbilicalProtocol umbilical)
+ throws IOException {
+ //first wait for the COMMIT approval from the tasktracker
+ int retries = MAX_RETRIES;
while (true) {
try {
- if (needProgress) {
- // send a final status report
- taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(),
+ // send a final status report
+ if (commitPending) {
+ taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
+ taskProgress.get(),
+ taskProgress.toString(),
+ counters);
+ } else {
+ taskStatus.statusUpdate(TaskStatus.State.RUNNING,
+ taskProgress.get(),
+ taskProgress.toString(),
counters);
- try {
- if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
- LOG.warn("Parent died. Exiting "+taskId);
- System.exit(66);
- }
- taskStatus.clearStatus();
- needProgress = false;
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt(); // interrupt ourself
- }
}
- // Check whether there is any task output
- boolean shouldBePromoted = false;
+
try {
- if (taskOutputPath != null) {
- // Get the file-system for the task output directory
- FileSystem fs = taskOutputPath.getFileSystem(conf);
- if (fs.exists(taskOutputPath)) {
- // Get the summary for the folder
- ContentSummary summary = fs.getContentSummary(taskOutputPath);
- // Check if the directory contains data to be promoted
- // i.e total-files + total-folders - 1(itself)
- if (summary != null
- && (summary.getFileCount() + summary.getDirectoryCount() - 1)
- > 0) {
- shouldBePromoted = true;
- }
- } else {
- LOG.info(getTaskID() + ": No outputs to promote from " +
- taskOutputPath);
- }
+ if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+ LOG.warn("Parent died. Exiting "+taskId);
+ System.exit(66);
}
- } catch (IOException ioe) {
- // To be safe in case of an exception
- shouldBePromoted = true;
+ taskStatus.clearStatus();
+ return;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt(); // interrupt ourself
}
- umbilical.done(taskId, shouldBePromoted);
- LOG.info("Task '" + getTaskID() + "' done.");
+ } catch (IOException ie) {
+ LOG.warn("Failure sending last status update: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ throw ie;
+ }
+ }
+ }
+ }
+
+ private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
+ int retries = MAX_RETRIES;
+ while (true) {
+ try {
+ umbilical.done(getTaskID());
+ LOG.info("Task '" + taskId + "' done.");
return;
} catch (IOException ie) {
LOG.warn("Failure signalling completion: " +
@@ -601,15 +641,66 @@
}
}
}
+
+ private void commit(TaskUmbilicalProtocol umbilical,
+ OutputCommitter committer) throws IOException {
+ int retries = MAX_RETRIES;
+ while (true) {
+ try {
+ while (!umbilical.canCommit(taskId)) {
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ie) {
+ //ignore
+ }
+ setProgressFlag();
+ }
+ // task can Commit now
+ try {
+ LOG.info("Task " + taskId + " is allowed to commit now");
+ committer.commitTask(taskContext);
+ return;
+ } catch (IOException iee) {
+ LOG.warn("Failure committing: " +
+ StringUtils.stringifyException(iee));
+ discardOutput(taskContext, committer);
+ throw iee;
+ }
+ } catch (IOException ie) {
+ LOG.warn("Failure asking whether task can commit: " +
+ StringUtils.stringifyException(ie));
+ if (--retries == 0) {
+ //if it couldn't commit a successfully then delete the output
+ discardOutput(taskContext, committer);
+ System.exit(68);
+ }
+ }
+ }
+ }
+
+ private void discardOutput(TaskAttemptContext taskContext,
+ OutputCommitter committer) {
+ try {
+ committer.abortTask(taskContext);
+ } catch (IOException ioe) {
+ LOG.warn("Failure cleaning up: " +
+ StringUtils.stringifyException(ioe));
+ }
+ }
+
+ protected void runCleanup(TaskUmbilicalProtocol umbilical)
+ throws IOException {
+ // set phase for this task
+ setPhase(TaskStatus.Phase.CLEANUP);
+ getProgress().setStatus("cleanup");
+ // do the cleanup
+ conf.getOutputCommitter().cleanupJob(jobContext);
+ done(umbilical);
+ }
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
-
- if (taskId != null && taskOutputPath == null &&
- FileOutputFormat.getOutputPath(this.conf) != null) {
- taskOutputPath = getTaskOutputPath(this.conf);
- }
} else {
this.conf = new JobConf(conf);
}
@@ -633,68 +724,6 @@
}
/**
- * Save the task's output on successful completion.
- *
- * @throws IOException
- */
- void saveTaskOutput() throws IOException {
-
- if (taskOutputPath != null) {
- FileSystem fs = taskOutputPath.getFileSystem(conf);
- if (fs.exists(taskOutputPath)) {
- Path jobOutputPath = taskOutputPath.getParent().getParent();
-
- // Move the task outputs to their final place
- moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
-
- // Delete the temporary task-specific output directory
- if (!fs.delete(taskOutputPath, true)) {
- LOG.info("Failed to delete the temporary output directory of task: " +
- getTaskID() + " - " + taskOutputPath);
- }
-
- LOG.info("Saved output of task '" + getTaskID() + "' to " + jobOutputPath);
- }
- }
- }
-
- private Path getFinalPath(Path jobOutputDir, Path taskOutput) {
- URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri());
- if (relativePath.getPath().length() > 0) {
- return new Path(jobOutputDir, relativePath.getPath());
- } else {
- return jobOutputDir;
- }
- }
-
- private void moveTaskOutputs(FileSystem fs, Path jobOutputDir, Path taskOutput)
- throws IOException {
- if (fs.isFile(taskOutput)) {
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
- if (!fs.rename(taskOutput, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new IOException("Failed to delete earlier output of task: " +
- getTaskID());
- }
- if (!fs.rename(taskOutput, finalOutputPath)) {
- throw new IOException("Failed to save output of task: " +
- getTaskID());
- }
- }
- LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
- } else if(fs.isDirectory(taskOutput)) {
- FileStatus[] paths = fs.listStatus(taskOutput);
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
- fs.mkdirs(finalOutputPath);
- if (paths != null) {
- for (FileStatus path : paths) {
- moveTaskOutputs(fs, jobOutputDir, path.getPath());
- }
- }
- }
- }
-
- /**
* OutputCollector for the combiner.
*/
protected static class CombineOutputCollector<K extends Object, V extends Object>
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java?rev=692408&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java Fri Sep 5 03:50:04 2008
@@ -0,0 +1,56 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.Progressable;
+
+public class TaskAttemptContext extends JobContext {
+
+ private JobConf conf;
+ private TaskAttemptID taskid;
+
+ TaskAttemptContext(JobConf conf, TaskAttemptID taskid) {
+ this(conf, taskid, Reporter.NULL);
+ }
+
+ TaskAttemptContext(JobConf conf, TaskAttemptID taskid,
+ Progressable progress) {
+ super(conf, progress);
+ this.conf = conf;
+ this.taskid = taskid;
+ }
+
+ /**
+ * Get the taskAttemptID.
+ *
+ * @return TaskAttemptID
+ */
+ public TaskAttemptID getTaskAttemptID() {
+ return taskid;
+ }
+
+ /**
+ * Get the job Configuration.
+ *
+ * @return JobConf
+ */
+ public JobConf getJobConf() {
+ return conf;
+ }
+
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Sep 5 03:50:04 2008
@@ -79,6 +79,7 @@
private boolean killed = false;
private volatile SortedRanges failedRanges = new SortedRanges();
private volatile boolean skipping = false;
+ private boolean cleanup = false;
// The 'next' usable taskid of this tip
int nextTaskId = 0;
@@ -107,6 +108,9 @@
//list of tasks to kill, <taskid> -> <shouldFail>
private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
+ //task to commit, <taskattemptid>
+ private TaskAttemptID taskToCommit;
+
private Counters counters = new Counters();
@@ -164,6 +168,14 @@
return partition;
}
+ public boolean isCleanupTask() {
+ return cleanup;
+ }
+
+ public void setCleanupTask() {
+ cleanup = true;
+ }
+
public boolean isOnlyCommitPending() {
for (TaskStatus t : taskStatuses.values()) {
if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
@@ -172,6 +184,14 @@
}
return false;
}
+
+ public boolean isCommitPending(TaskAttemptID taskId) {
+ TaskStatus t = taskStatuses.get(taskId);
+ if (t == null) {
+ return false;
+ }
+ return t.getRunState() == TaskStatus.State.COMMIT_PENDING;
+ }
/**
* Initialization common to Map and Reduce
@@ -324,6 +344,10 @@
!tasksReportedClosed.contains(taskid)) {
tasksReportedClosed.add(taskid);
close = true;
+ } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
+ !tasksReportedClosed.contains(taskid)) {
+ tasksReportedClosed.add(taskid);
+ close = true;
} else {
close = tasksToKill.keySet().contains(taskid);
}
@@ -331,6 +355,21 @@
}
/**
+ * Commit this task attempt for the tip.
+ * @param taskid
+ */
+ public void doCommit(TaskAttemptID taskid) {
+ taskToCommit = taskid;
+ }
+
+ /**
+ * Returns whether the task attempt should be committed or not
+ */
+ public boolean shouldCommit(TaskAttemptID taskid) {
+ return taskToCommit.equals(taskid);
+ }
+
+ /**
* Creates a "status report" for this task. Includes the
* task ID and overall status, plus reports for all the
* component task-threads that have ever been started.
@@ -401,7 +440,8 @@
// status update for the same taskid! This is a safety check,
// and is addressed better at the TaskTracker to ensure this.
// @see {@link TaskTracker.transmitHeartbeat()}
- if ((newState != TaskStatus.State.RUNNING) &&
+ if ((newState != TaskStatus.State.RUNNING &&
+ newState != TaskStatus.State.COMMIT_PENDING ) &&
(oldState == newState)) {
LOG.warn("Recieved duplicate status update of '" + newState +
"' for '" + taskid + "' of TIP '" + getTIPId() + "'");
@@ -733,6 +773,9 @@
} else {
t = new ReduceTask(jobFile, taskid, partition, numMaps);
}
+ if (cleanup) {
+ t.setCleanupTask();
+ }
t.setConf(conf);
t.setFailedRanges(failedRanges);
t.setSkipping(skipping);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Sep 5 03:50:04 2008
@@ -436,7 +436,7 @@
}catch(IOException ie){
LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
}
- tracker.reportTaskFinished(t.getTaskID());
+ tracker.reportTaskFinished(t.getTaskID(), false);
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Sep 5 03:50:04 2008
@@ -37,7 +37,7 @@
LogFactory.getLog(TaskStatus.class.getName());
//enumeration for reporting current phase of a task.
- public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
+ public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
// what state is the task in?
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
@@ -264,9 +264,11 @@
* @param phase
* @param counters
*/
- synchronized void statusUpdate(float progress, String state,
+ synchronized void statusUpdate(State runState,
+ float progress,
+ String state,
Counters counters) {
- setRunState(TaskStatus.State.RUNNING);
+ setRunState(runState);
setProgress(progress);
setStateString(state);
setCounters(counters);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 5 03:50:04 2008
@@ -28,6 +28,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -191,6 +192,12 @@
*/
private int probe_sample_size = 500;
+ /*
+ * A list of commitTaskActions for whom commit response has been received
+ */
+ private List<TaskAttemptID> commitResponses =
+ Collections.synchronizedList(new ArrayList<TaskAttemptID>());
+
private ShuffleServerMetrics shuffleServerMetrics;
/** This class contains the methods that should be used for metrics-reporting
* the specific metrics for shuffle. The TaskTracker is actually a server for
@@ -407,8 +414,10 @@
// RPC initialization
int max = maxCurrentMapTasks > maxCurrentReduceTasks ?
maxCurrentMapTasks : maxCurrentReduceTasks;
+ //set the num handlers to max*2 since canCommit may wait for the duration
+ //of a heartbeat RPC
this.taskReportServer =
- RPC.getServer(this, bindAddress, tmpPort, max, false, this.fConf);
+ RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf);
this.taskReportServer.start();
// get the assigned address
@@ -957,6 +966,13 @@
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
startNewTask((LaunchTaskAction) action);
+ } else if (action instanceof CommitTaskAction) {
+ CommitTaskAction commitAction = (CommitTaskAction)action;
+ if (!commitResponses.contains(commitAction.getTaskID())) {
+ LOG.info("Received commit task action for " +
+ commitAction.getTaskID());
+ commitResponses.add(commitAction.getTaskID());
+ }
} else {
tasksToCleanup.put(action);
}
@@ -1072,7 +1088,8 @@
synchronized (this) {
for (TaskStatus taskStatus : status.getTaskReports()) {
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
if (taskStatus.getIsMap()) {
mapTotal--;
} else {
@@ -1188,7 +1205,8 @@
private synchronized void markUnresponsiveTasks() throws IOException {
long now = System.currentTimeMillis();
for (TaskInProgress tip: runningTasks.values()) {
- if (tip.getRunState() == TaskStatus.State.RUNNING) {
+ if (tip.getRunState() == TaskStatus.State.RUNNING ||
+ tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
// Check the per-job timeout interval for tasks;
// an interval of '0' implies it is never timed-out
long jobTaskTimeout = tip.getTaskTimeout();
@@ -1308,7 +1326,8 @@
TaskInProgress killMe = null;
for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
TaskInProgress tip = (TaskInProgress) it.next();
- if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
+ if ((tip.getRunState() == TaskStatus.State.RUNNING ||
+ tip.getRunState() == TaskStatus.State.COMMIT_PENDING) &&
!tip.wasKilled) {
if (killMe == null) {
@@ -1517,7 +1536,6 @@
private TaskStatus taskStatus;
private long taskTimeout;
private String debugCommand;
- private boolean shouldPromoteOutput = false;
/**
*/
@@ -1667,7 +1685,8 @@
"% " + taskStatus.getStateString());
if (this.done ||
- this.taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ (this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+ this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
//make sure we ignore progress messages after a task has
//invoked TaskUmbilicalProtocol.done() or if the task has been
//KILLED/FAILED
@@ -1717,15 +1736,8 @@
/**
* The task is reporting that it's done running
*/
- public synchronized void reportDone(boolean shouldPromote) {
- TaskStatus.State state = null;
- this.shouldPromoteOutput = shouldPromote;
- if (shouldPromote) {
- state = TaskStatus.State.COMMIT_PENDING;
- } else {
- state = TaskStatus.State.SUCCEEDED;
- }
- this.taskStatus.setRunState(state);
+ public synchronized void reportDone() {
+ this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
this.taskStatus.setProgress(1.0f);
this.taskStatus.setFinishTime(System.currentTimeMillis());
this.done = true;
@@ -1758,13 +1770,7 @@
//
boolean needCleanup = false;
synchronized (this) {
- if (done) {
- if (shouldPromoteOutput) {
- taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
- } else {
- taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
- }
- } else {
+ if (!done) {
if (!wasKilled) {
failures += 1;
taskStatus.setRunState(TaskStatus.State.FAILED);
@@ -1960,7 +1966,8 @@
public void jobHasFinished(boolean wasFailure) throws IOException {
// Kill the task if it is still running
synchronized(this){
- if (getRunState() == TaskStatus.State.RUNNING) {
+ if (getRunState() == TaskStatus.State.RUNNING ||
+ getRunState() == TaskStatus.State.COMMIT_PENDING) {
kill(wasFailure);
}
}
@@ -1974,7 +1981,8 @@
* @param wasFailure was it a failure (versus a kill request)?
*/
public synchronized void kill(boolean wasFailure) throws IOException {
- if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+ if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
+ taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
wasKilled = true;
if (wasFailure) {
failures += 1;
@@ -2136,13 +2144,33 @@
}
/**
+ * Task is reporting that it is in commit_pending
+ * and it is waiting for the commit Response
+ */
+ public synchronized void commitPending(TaskAttemptID taskid,
+ TaskStatus taskStatus)
+ throws IOException {
+ LOG.info("Task " + taskid + " is in COMMIT_PENDING");
+ statusUpdate(taskid, taskStatus);
+ reportTaskFinished(taskid, true);
+ }
+
+ /**
+ * Child checking whether it can commit
+ */
+ public synchronized boolean canCommit(TaskAttemptID taskid) {
+ return commitResponses.contains(taskid); //don't remove it now
+ }
+
+ /**
* The task is done.
*/
- public synchronized void done(TaskAttemptID taskid, boolean shouldPromote)
+ public synchronized void done(TaskAttemptID taskid)
throws IOException {
TaskInProgress tip = tasks.get(taskid);
+ commitResponses.remove(taskid);
if (tip != null) {
- tip.reportDone(shouldPromote);
+ tip.reportDone();
} else {
LOG.warn("Unknown child task done: "+taskid+". Ignored.");
}
@@ -2196,13 +2224,15 @@
/**
* The task is no longer running. It may not have completed successfully
*/
- void reportTaskFinished(TaskAttemptID taskid) {
+ void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) {
TaskInProgress tip;
synchronized (this) {
tip = tasks.get(taskid);
}
if (tip != null) {
- tip.taskFinished();
+ if (!commitPending) {
+ tip.taskFinished();
+ }
synchronized(finishedCount) {
finishedCount[0]++;
finishedCount.notify();
@@ -2331,7 +2361,7 @@
TaskStatus status = tip.getStatus();
status.setIncludeCounters(sendCounters);
status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
- // send counters for finished or failed tasks.
+ // send counters for finished or failed tasks and commit pending tasks
if (status.getRunState() != TaskStatus.State.RUNNING) {
status.setIncludeCounters(true);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Fri Sep 5 03:50:04 2008
@@ -48,7 +48,10 @@
KILL_JOB,
/** Reinitialize the tasktracker. */
- REINIT_TRACKER
+ REINIT_TRACKER,
+
+ /** Ask a task to save its output. */
+ COMMIT_TASK
};
/**
@@ -80,6 +83,11 @@
action = new ReinitTrackerAction();
}
break;
+ case COMMIT_TASK:
+ {
+ action = new CommitTaskAction();
+ }
+ break;
}
return action;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Sep 5 03:50:04 2008
@@ -207,7 +207,8 @@
TaskStatus.State state = ts.getRunState();
if (ts.getIsMap() &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
+ (state == TaskStatus.State.UNASSIGNED) ||
+ (state == TaskStatus.State.COMMIT_PENDING))) {
mapCount++;
}
}
@@ -224,7 +225,8 @@
TaskStatus.State state = ts.getRunState();
if ((!ts.getIsMap()) &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
+ (state == TaskStatus.State.UNASSIGNED) ||
+ (state == TaskStatus.State.COMMIT_PENDING))) {
reduceCount++;
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Sep 5 03:50:04 2008
@@ -45,9 +45,10 @@
* Version 9 changes the counter representation for HADOOP-1915
* Version 10 changed the TaskStatus format and added reportNextRecordRange
* for HADOOP-153
+ * Version 11 Adds RPCs for task commit as part of HADOOP-3150
* */
- public static final long versionID = 10L;
+ public static final long versionID = 11L;
/** Called when a child task process starts, to get its task.*/
Task getTask(TaskAttemptID taskid) throws IOException;
@@ -88,9 +89,26 @@
/** Report that the task is successfully completed. Failure is assumed if
* the task process exits without calling this.
* @param taskid task's id
- * @param shouldBePromoted whether to promote the task's output or not
*/
- void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException;
+ void done(TaskAttemptID taskid) throws IOException;
+
+ /**
+ * Report that the task is complete, but its commit is pending.
+ *
+ * @param taskId task's id
+ * @param taskStatus status of the child
+ * @throws IOException
+ */
+ void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException;
+
+ /**
+ * Polling to know whether the task can go-ahead with commit
+ * @param taskid
+ * @return true/false
+ * @throws IOException
+ */
+ boolean canCommit(TaskAttemptID taskid) throws IOException;
/** Report that a reduce-task couldn't shuffle map-outputs.*/
void shuffleError(TaskAttemptID taskId, String message) throws IOException;
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=692408&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Sep 5 03:50:04 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+public class TestFileOutputCommitter extends TestCase {
+ private static Path outDir = new Path(
+ System.getProperty("test.build.data", "."), "output");
+
+ // A random task attempt id for testing.
+ private static String attempt = "attempt_200707121733_0001_m_000000_0";
+ private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+
+ @SuppressWarnings("unchecked")
+ public void testCommitter() throws Exception {
+ JobConf job = new JobConf();
+ job.set("mapred.task.id", attempt);
+ job.setOutputCommitter(FileOutputCommitter.class);
+ JobContext jContext = new JobContext(job);
+ TaskAttemptContext tContext = new TaskAttemptContext(job, taskID);
+ FileOutputFormat.setOutputPath(job, outDir);
+ FileOutputCommitter committer = new FileOutputCommitter();
+ FileOutputFormat.setWorkOutputPath(job,
+ committer.getTempTaskOutputPath(tContext));
+
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+ String file = "test.txt";
+
+ // A reporter that does nothing
+ Reporter reporter = Reporter.NULL;
+ FileSystem localFs = FileSystem.getLocal(job);
+ TextOutputFormat theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter =
+ theOutputFormat.getRecordWriter(localFs, job, file, reporter);
+ Text key1 = new Text("key1");
+ Text key2 = new Text("key2");
+ Text val1 = new Text("val1");
+ Text val2 = new Text("val2");
+ NullWritable nullWritable = NullWritable.get();
+
+ try {
+ theRecordWriter.write(key1, val1);
+ theRecordWriter.write(null, nullWritable);
+ theRecordWriter.write(null, val1);
+ theRecordWriter.write(nullWritable, val2);
+ theRecordWriter.write(key2, nullWritable);
+ theRecordWriter.write(key1, null);
+ theRecordWriter.write(null, null);
+ theRecordWriter.write(key2, val2);
+ } finally {
+ theRecordWriter.close(reporter);
+ }
+ committer.commitTask(tContext);
+ committer.cleanupJob(jContext);
+
+ File expectedFile = new File(new Path(outDir, file).toString());
+ StringBuffer expectedOutput = new StringBuffer();
+ expectedOutput.append(key1).append('\t').append(val1).append("\n");
+ expectedOutput.append(val1).append("\n");
+ expectedOutput.append(val2).append("\n");
+ expectedOutput.append(key2).append("\n");
+ expectedOutput.append(key1).append("\n");
+ expectedOutput.append(key2).append('\t').append(val2).append("\n");
+ String output = UtilsForTests.slurp(expectedFile);
+ assertEquals(output, expectedOutput.toString());
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestFileOutputCommitter().testCommitter();
+ }
+}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Fri Sep 5 03:50:04 2008
@@ -74,6 +74,8 @@
assertEquals("number of maps", 1, reports.length);
reports = client.getReduceTaskReports(jobid);
assertEquals("number of reduces", 1, reports.length);
+ reports = client.getCleanupTaskReports(jobid);
+ assertEquals("number of cleanups", 2, reports.length);
Counters counters = ret.job.getCounters();
assertEquals("number of map inputs", 3,
counters.getCounter(Task.Counter.MAP_INPUT_RECORDS));
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Fri Sep 5 03:50:04 2008
@@ -38,12 +38,14 @@
}
}
+ // A random task attempt id for testing.
+ private static String attempt = "attempt_200707121733_0001_m_000000_0";
+
private static Path workDir =
new Path(new Path(
new Path(System.getProperty("test.build.data", "."),
"data"),
- MRConstants.TEMP_DIR_NAME),
- "TestMultipleTextOutputFormat");
+ FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
private static void writeData(RecordWriter<Text, Text> rw) throws IOException {
for (int i = 10; i < 40; i++) {
@@ -84,6 +86,7 @@
public void testFormat() throws Exception {
JobConf job = new JobConf();
+ job.set("mapred.task.id", attempt);
FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Fri Sep 5 03:50:04 2008
@@ -230,6 +230,13 @@
}
}
rjob.killJob();
+ while(rjob.cleanupProgress() == 0.0f) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
if (shouldSucceed) {
assertTrue(rjob.isComplete());
} else {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Fri Sep 5 03:50:04 2008
@@ -23,7 +23,6 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import junit.framework.TestCase;
@@ -34,6 +33,8 @@
LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName());
private static final int RECORDS = 10000;
+ // A random task attempt id for testing.
+ private static final String attempt = "attempt_200707121733_0001_m_000000_0";
public void testBinary() throws IOException {
JobConf job = new JobConf();
@@ -41,8 +42,7 @@
Path dir =
new Path(new Path(new Path(System.getProperty("test.build.data",".")),
- MRConstants.TEMP_DIR_NAME),
- "mapred");
+ FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
Path file = new Path(dir, "testbinary.seq");
Random r = new Random();
long seed = r.nextLong();
@@ -53,6 +53,7 @@
fail("Failed to create output directory");
}
+ job.set("mapred.task.id", attempt);
FileOutputFormat.setOutputPath(job, dir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, dir);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Fri Sep 5 03:50:04 2008
@@ -35,17 +35,19 @@
throw new RuntimeException("init failure", e);
}
}
+ // A random task attempt id for testing.
+ private static String attempt = "attempt_200707121733_0001_m_000000_0";
private static Path workDir =
new Path(new Path(
new Path(System.getProperty("test.build.data", "."),
"data"),
- MRConstants.TEMP_DIR_NAME),
- "TestTextOutputFormat");
+ FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
@SuppressWarnings("unchecked")
public void testFormat() throws Exception {
JobConf job = new JobConf();
+ job.set("mapred.task.id", attempt);
FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
@@ -98,6 +100,7 @@
JobConf job = new JobConf();
String separator = "\u0001";
job.set("mapred.textoutputformat.separator", separator);
+ job.set("mapred.task.id", attempt);
FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
Modified: hadoop/core/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobdetails.jsp?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobdetails.jsp Fri Sep 5 03:50:04 2008
@@ -88,6 +88,41 @@
) +
"</td></tr>\n");
}
+
+ private void printCleanupTaskSummary(JspWriter out,
+ String jobId,
+ TaskInProgress[] tasks
+ ) throws IOException {
+ int totalTasks = tasks.length;
+ int runningTasks = 0;
+ int finishedTasks = 0;
+ int killedTasks = 0;
+ String kind = "cleanup";
+ for(int i=0; i < totalTasks; ++i) {
+ TaskInProgress task = tasks[i];
+ if (task.isComplete()) {
+ finishedTasks += 1;
+ } else if (task.isRunning()) {
+ runningTasks += 1;
+ } else if (task.isFailed()) {
+ killedTasks += 1;
+ }
+ }
+ int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks;
+ out.print(((runningTasks > 0)
+ ? "<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind +
+ "&pagenum=1" + "&state=running\">" + " Running" +
+ "</a>"
+ : ((pendingTasks > 0) ? " Pending" :
+ ((finishedTasks > 0)
+ ?"<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind +
+ "&pagenum=1" + "&state=completed\">" + " Successful"
+ + "</a>"
+ : ((killedTasks > 0)
+ ?"<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind +
+ "&pagenum=1" + "&state=killed\">" + " Failed"
+ + "</a>" : "None")))));
+ }
private void printConfirm(JspWriter out, String jobId) throws IOException{
String url = "jobdetails.jsp?jobid=" + jobId;
@@ -194,6 +229,9 @@
job.getFinishTime(), job.getStartTime()) + "<br>\n");
}
}
+ out.print("<b>Job Cleanup:</b>");
+ printCleanupTaskSummary(out, jobId, job.getCleanupTasks());
+ out.print("<br>\n");
if (flakyTaskTrackers > 0) {
out.print("<b>Black-listed TaskTrackers:</b> " +
"<a href=\"jobblacklistedtrackers.jsp?jobid=" + jobId + "\">" +
Modified: hadoop/core/trunk/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtasks.jsp?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobtasks.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobtasks.jsp Fri Sep 5 03:50:04 2008
@@ -39,9 +39,12 @@
reports = (job != null) ? tracker.getMapTaskReports(jobidObj) : null;
tasks = (job != null) ? job.getMapTasks() : null;
}
- else{
+ else if ("reduce".equals(type)) {
reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
tasks = (job != null) ? job.getReduceTasks() : null;
+ } else {
+ reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
+ tasks = (job != null) ? job.getCleanupTasks() : null;
}
%>