You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/12/18 09:31:42 UTC

svn commit: r892178 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/TaskInProgress.java src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java

Author: yhemanth
Date: Fri Dec 18 08:31:41 2009
New Revision: 892178

URL: http://svn.apache.org/viewvc?rev=892178&view=rev
Log:
MAPREDUCE-1143. Fix running task counters to be updated correctly when speculative attempts are running for a TIP. Contributed by Rahul Kumar Singh.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=892178&r1=892177&r2=892178&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 18 08:31:41 2009
@@ -1031,3 +1031,7 @@
     MAPREDUCE-1284. Fix fts_open() call in task-controller that was failing
     LinuxTaskController unit tests. (Ravi Gummadi via yhemanth)
 
+    MAPREDUCE-1143. Fix running task counters to be updated correctly
+    when speculative attempts are running for a TIP.
+    (Rahul Kumar Singh via yhemanth)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=892178&r1=892177&r2=892178&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 18 08:31:41 2009
@@ -1003,6 +1003,8 @@
     boolean wasComplete = tip.isComplete();
     boolean wasPending = tip.isOnlyCommitPending();
     TaskAttemptID taskid = status.getTaskID();
+    boolean wasAttemptRunning = tip.isAttemptRunning(taskid);
+
     
     // If the TIP is already completed and the task reports as SUCCEEDED then 
     // mark the task as KILLED.
@@ -1099,7 +1101,7 @@
         
         // Tell the job to fail the relevant task
         failedTask(tip, taskid, status, taskTracker,
-                   wasRunning, wasComplete);
+                   wasRunning, wasComplete, wasAttemptRunning);
 
         // Did the task failure lead to tip failure?
         TaskCompletionEvent.Status taskCompletionStatus = 
@@ -2950,10 +2952,10 @@
    * we need to schedule reexecution so that downstream reduce tasks can 
    * obtain the map task's output.
    */
-  private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
-                          TaskStatus status, 
-                          TaskTracker taskTracker,
-                          boolean wasRunning, boolean wasComplete) {
+  private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
+                          TaskStatus status,
+                          TaskTracker taskTracker, boolean wasRunning,
+                          boolean wasComplete, boolean wasAttemptRunning) {
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
     boolean wasSpeculating = tip.isSpeculating();
@@ -2964,6 +2966,25 @@
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
+
+    if(wasAttemptRunning) {
+      // We are decrementing counters without looking for isRunning ,
+      // because we increment the counters when we obtain
+      // new map task attempt or reduce task attempt.We do not really check
+      // for tip being running.
+      // Whenever we obtain new task attempt runningMapTasks incremented.
+      // hence we are decrementing the same.      
+      if(!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
+        if(tip.isMapTask()) {
+          runningMapTasks -= 1;
+        } else {
+          runningReduceTasks -= 1;
+        }
+      }
+      
+      // Metering
+      meterTaskAttempt(tip, status);
+    }
         
     //update running  count on task failure.
     if (wasRunning && !isRunning) {
@@ -2972,7 +2993,6 @@
       } else if (tip.isJobSetupTask()) {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
-        runningMapTasks -= 1;
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
         if (!isComplete) {
@@ -2980,7 +3000,6 @@
           failMap(tip);
         }
       } else {
-        runningReduceTasks -= 1;
         // remove from the running queue and put in the failed queue if the tip
         // is not complete
         if (!isComplete) {
@@ -2988,9 +3007,6 @@
           failReduce(tip);
         }
       }
-      
-      // Metering
-      meterTaskAttempt(tip, status);
     }
         
     // The case when the map was complete but the task tracker went down.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=892178&r1=892177&r2=892178&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Dec 18 08:31:41 2009
@@ -335,6 +335,15 @@
   public boolean isRunning() {
     return !activeTasks.isEmpty();
   }
+
+  /**
+   * Is this TaskAttemptid running
+   * @param taskId
+   * @return true if taskId attempt is running.
+   */
+  boolean isAttemptRunning(TaskAttemptID taskId) {
+    return activeTasks.containsKey(taskId);
+  }
     
   TaskAttemptID getSuccessfulTaskid() {
     return successfulTaskId;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=892178&r1=892177&r2=892178&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Fri Dec 18 08:31:41 2009
@@ -23,6 +23,8 @@
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
@@ -40,6 +42,8 @@
     }
   };
   static SpecFakeClock clock;
+  static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
+
   
   static String trackers[] = new String[] {"tracker_tracker1:1000", 
       "tracker_tracker2:1000", "tracker_tracker3:1000",
@@ -65,6 +69,68 @@
     return setup;
   }
 
+  public void testRunningTaskCountWithSpeculation() throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(3);
+    conf.setNumReduceTasks(3);
+    conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+
+    //Check for runningMap counts first
+    //schedule maps
+    taskAttemptID[0] = job.findMapTask(trackers[0]);
+    taskAttemptID[1] = job.findMapTask(trackers[1]);
+    taskAttemptID[2] = job.findMapTask(trackers[2]);
+
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[0]);
+    clock.advance(1000);
+    job.finishTask(taskAttemptID[1]);
+    clock.advanceBySpeculativeLag();
+
+    //we should get a speculative task now
+    taskAttemptID[3] = job.findMapTask(trackers[3]);
+    int oldRunningMap = job.runningMaps();
+    LOG.info("No of running maps before fail was " + oldRunningMap);
+    job.failTask(taskAttemptID[2]);
+    assertEquals(
+      "Running maps count should be updated from " + oldRunningMap + " to " +
+        (oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
+    LOG.info(" Job running maps after fail " + job.runningMaps());
+
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[3]);
+
+    //check for runningReduce count.
+    taskAttemptID[4] = job.findReduceTask(trackers[0]);
+    taskAttemptID[5] = job.findReduceTask(trackers[1]);
+    taskAttemptID[6] = job.findReduceTask(trackers[2]);
+
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[4]);
+    clock.advance(1000);
+    job.finishTask(taskAttemptID[5]);
+
+    clock.advanceBySpeculativeLag();
+    taskAttemptID[7] = job.findReduceTask(trackers[4]);
+
+    int oldRunningReduces = job.runningReduces();
+    job.failTask(taskAttemptID[6]);
+    LOG.info(
+      " No of running Reduces before fail " + oldRunningReduces);
+    LOG.info(
+      " No of runing reduces after fail " + job.runningReduces());
+    assertEquals(
+      "Running reduces count should be updated from " + oldRunningReduces +
+        " to " + (oldRunningReduces - 1), job.runningReduces(),
+      oldRunningReduces - 1);
+    
+    job.finishTask(taskAttemptID[7]);
+  }
+
   public void testIsSlowTracker() throws IOException {
     TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
     JobConf conf = new JobConf();
@@ -171,7 +237,7 @@
     taskAttemptID[5] = job.findMapTask(trackers[4]);
     assertEquals(taskAttemptID[5].getTaskID().getId(),4);
   }
-  
+
   /*
    * Tests the fact that we only launch a limited number of speculative tasks,
    * even though we have a lot of tasks in RUNNING state
@@ -219,7 +285,7 @@
       taskAttemptID[i] = job.findMapTask(trackers[1]);
       clock.advance(2000);
       if (taskAttemptID[i] != null) {
-        //add some good progress constantly for the different 
+        //add some good progress constantly for the different
         //task-attempts so that
         //the tasktracker doesn't get into the slow trackers category
         job.progressMade(taskAttemptID[i], 0.99f);