You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/12/18 09:31:42 UTC
svn commit: r892178 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobInProgress.java
src/java/org/apache/hadoop/mapred/TaskInProgress.java
src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
Author: yhemanth
Date: Fri Dec 18 08:31:41 2009
New Revision: 892178
URL: http://svn.apache.org/viewvc?rev=892178&view=rev
Log:
MAPREDUCE-1143. Fix running task counters to be updated correctly when speculative attempts are running for a TIP. Contributed by Rahul Kumar Singh.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=892178&r1=892177&r2=892178&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 18 08:31:41 2009
@@ -1031,3 +1031,7 @@
MAPREDUCE-1284. Fix fts_open() call in task-controller that was failing
LinuxTaskController unit tests. (Ravi Gummadi via yhemanth)
+ MAPREDUCE-1143. Fix running task counters to be updated correctly
+ when speculative attempts are running for a TIP.
+ (Rahul Kumar Singh via yhemanth)
+
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=892178&r1=892177&r2=892178&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 18 08:31:41 2009
@@ -1003,6 +1003,8 @@
boolean wasComplete = tip.isComplete();
boolean wasPending = tip.isOnlyCommitPending();
TaskAttemptID taskid = status.getTaskID();
+ boolean wasAttemptRunning = tip.isAttemptRunning(taskid);
+
// If the TIP is already completed and the task reports as SUCCEEDED then
// mark the task as KILLED.
@@ -1099,7 +1101,7 @@
// Tell the job to fail the relevant task
failedTask(tip, taskid, status, taskTracker,
- wasRunning, wasComplete);
+ wasRunning, wasComplete, wasAttemptRunning);
// Did the task failure lead to tip failure?
TaskCompletionEvent.Status taskCompletionStatus =
@@ -2950,10 +2952,10 @@
* we need to schedule reexecution so that downstream reduce tasks can
* obtain the map task's output.
*/
- private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
- TaskStatus status,
- TaskTracker taskTracker,
- boolean wasRunning, boolean wasComplete) {
+ private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
+ TaskStatus status,
+ TaskTracker taskTracker, boolean wasRunning,
+ boolean wasComplete, boolean wasAttemptRunning) {
// check if the TIP is already failed
boolean wasFailed = tip.isFailed();
boolean wasSpeculating = tip.isSpeculating();
@@ -2964,6 +2966,25 @@
boolean isRunning = tip.isRunning();
boolean isComplete = tip.isComplete();
+
+ if(wasAttemptRunning) {
+ // We are decrementing counters without looking for isRunning ,
+ // because we increment the counters when we obtain
+ // new map task attempt or reduce task attempt.We do not really check
+ // for tip being running.
+ // Whenever we obtain new task attempt runningMapTasks incremented.
+ // hence we are decrementing the same.
+ if(!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
+ if(tip.isMapTask()) {
+ runningMapTasks -= 1;
+ } else {
+ runningReduceTasks -= 1;
+ }
+ }
+
+ // Metering
+ meterTaskAttempt(tip, status);
+ }
//update running count on task failure.
if (wasRunning && !isRunning) {
@@ -2972,7 +2993,6 @@
} else if (tip.isJobSetupTask()) {
launchedSetup = false;
} else if (tip.isMapTask()) {
- runningMapTasks -= 1;
// remove from the running queue and put it in the non-running cache
// if the tip is not complete i.e if the tip still needs to be run
if (!isComplete) {
@@ -2980,7 +3000,6 @@
failMap(tip);
}
} else {
- runningReduceTasks -= 1;
// remove from the running queue and put in the failed queue if the tip
// is not complete
if (!isComplete) {
@@ -2988,9 +3007,6 @@
failReduce(tip);
}
}
-
- // Metering
- meterTaskAttempt(tip, status);
}
// The case when the map was complete but the task tracker went down.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=892178&r1=892177&r2=892178&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Dec 18 08:31:41 2009
@@ -335,6 +335,15 @@
public boolean isRunning() {
return !activeTasks.isEmpty();
}
+
+ /**
+ * Is this TaskAttemptid running
+ * @param taskId
+ * @return true if taskId attempt is running.
+ */
+ boolean isAttemptRunning(TaskAttemptID taskId) {
+ return activeTasks.containsKey(taskId);
+ }
TaskAttemptID getSuccessfulTaskid() {
return successfulTaskId;
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=892178&r1=892177&r2=892178&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Fri Dec 18 08:31:41 2009
@@ -23,6 +23,8 @@
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
@@ -40,6 +42,8 @@
}
};
static SpecFakeClock clock;
+ static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
+
static String trackers[] = new String[] {"tracker_tracker1:1000",
"tracker_tracker2:1000", "tracker_tracker3:1000",
@@ -65,6 +69,68 @@
return setup;
}
+ public void testRunningTaskCountWithSpeculation() throws IOException {
+ TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
+ JobConf conf = new JobConf();
+ conf.setSpeculativeExecution(true);
+ conf.setNumMapTasks(3);
+ conf.setNumReduceTasks(3);
+ conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
+ FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+ job.initTasks();
+
+ //Check for runningMap counts first
+ //schedule maps
+ taskAttemptID[0] = job.findMapTask(trackers[0]);
+ taskAttemptID[1] = job.findMapTask(trackers[1]);
+ taskAttemptID[2] = job.findMapTask(trackers[2]);
+
+ clock.advance(5000);
+ job.finishTask(taskAttemptID[0]);
+ clock.advance(1000);
+ job.finishTask(taskAttemptID[1]);
+ clock.advanceBySpeculativeLag();
+
+ //we should get a speculative task now
+ taskAttemptID[3] = job.findMapTask(trackers[3]);
+ int oldRunningMap = job.runningMaps();
+ LOG.info("No of running maps before fail was " + oldRunningMap);
+ job.failTask(taskAttemptID[2]);
+ assertEquals(
+ "Running maps count should be updated from " + oldRunningMap + " to " +
+ (oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
+ LOG.info(" Job running maps after fail " + job.runningMaps());
+
+ clock.advance(5000);
+ job.finishTask(taskAttemptID[3]);
+
+ //check for runningReduce count.
+ taskAttemptID[4] = job.findReduceTask(trackers[0]);
+ taskAttemptID[5] = job.findReduceTask(trackers[1]);
+ taskAttemptID[6] = job.findReduceTask(trackers[2]);
+
+ clock.advance(5000);
+ job.finishTask(taskAttemptID[4]);
+ clock.advance(1000);
+ job.finishTask(taskAttemptID[5]);
+
+ clock.advanceBySpeculativeLag();
+ taskAttemptID[7] = job.findReduceTask(trackers[4]);
+
+ int oldRunningReduces = job.runningReduces();
+ job.failTask(taskAttemptID[6]);
+ LOG.info(
+ " No of running Reduces before fail " + oldRunningReduces);
+ LOG.info(
+ " No of runing reduces after fail " + job.runningReduces());
+ assertEquals(
+ "Running reduces count should be updated from " + oldRunningReduces +
+ " to " + (oldRunningReduces - 1), job.runningReduces(),
+ oldRunningReduces - 1);
+
+ job.finishTask(taskAttemptID[7]);
+ }
+
public void testIsSlowTracker() throws IOException {
TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
JobConf conf = new JobConf();
@@ -171,7 +237,7 @@
taskAttemptID[5] = job.findMapTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),4);
}
-
+
/*
* Tests the fact that we only launch a limited number of speculative tasks,
* even though we have a lot of tasks in RUNNING state
@@ -219,7 +285,7 @@
taskAttemptID[i] = job.findMapTask(trackers[1]);
clock.advance(2000);
if (taskAttemptID[i] != null) {
- //add some good progress constantly for the different
+ //add some good progress constantly for the different
//task-attempts so that
//the tasktracker doesn't get into the slow trackers category
job.progressMade(taskAttemptID[i], 0.99f);