You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2013/07/27 05:44:19 UTC

svn commit: r1507569 - in /hadoop/common/branches/branch-1: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: tucu
Date: Sat Jul 27 03:44:18 2013
New Revision: 1507569

URL: http://svn.apache.org/r1507569
Log:
MAPREDUCE-4366. mapred metrics shows negative count of waiting maps and reduces. (sandyr via tucu)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sat Jul 27 03:44:18 2013
@@ -105,6 +105,9 @@ Release 1.3.0 - unreleased
     HADOOP-9507. LocalFileSystem rename() is broken in some cases when
     destination exists. (cnauroth)
 
+    MAPREDUCE-4366. mapred metrics shows negative count of waiting maps and 
+    reduces. (sandyr via tucu)
+
 Release 1.2.1 - 2013.07.06
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Sat Jul 27 03:44:18 2013
@@ -115,6 +115,9 @@ public class JobInProgress {
     
   // runningMapTasks include speculative tasks, so we need to capture 
   // speculative tasks separately 
+  // if a task is incomplete, running attempts over one per task are counted
+  // in these variables.  if a task is complete, all its running attempts are
+  // included
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
   
@@ -1738,6 +1741,7 @@ public class JobInProgress {
     String name;
     String splits = "";
     Enum<Counter> counter = null;
+    boolean speculative = tip.getActiveTasks().size() > 1;
     if (tip.isJobSetupTask()) {
       launchedSetup = true;
       name = Values.SETUP.name();
@@ -1749,18 +1753,18 @@ public class JobInProgress {
       name = Values.MAP.name();
       counter = Counter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
-      if (tip.getActiveTasks().size() > 1)
+      if (speculative)
         speculativeMapTasks++;
-      metrics.launchMap(id);
-      this.queueMetrics.launchMap(id);
+      metrics.launchMap(id, speculative);
+      this.queueMetrics.launchMap(id, speculative);
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
       counter = Counter.TOTAL_LAUNCHED_REDUCES;
-      if (tip.getActiveTasks().size() > 1)
+      if (speculative)
         speculativeReduceTasks++;
-      metrics.launchReduce(id);
-      this.queueMetrics.launchReduce(id);
+      metrics.launchReduce(id, speculative);
+      this.queueMetrics.launchReduce(id, speculative);
     }
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
     // restart has already their logs in place.
@@ -2692,10 +2696,6 @@ public class JobInProgress {
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
-      // check if this was a sepculative task
-      if (oldNumAttempts > 1) {
-        speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
-      }
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
       this.queueMetrics.completeMap(taskid);
@@ -2709,9 +2709,6 @@ public class JobInProgress {
       }
     } else {
       runningReduceTasks -= 1;
-      if (oldNumAttempts > 1) {
-        speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
-      }
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
       this.queueMetrics.completeReduce(taskid);
@@ -3003,7 +3000,7 @@ public class JobInProgress {
     tip.incompleteSubTask(taskid, this.status);
    
     boolean isRunning = tip.isRunning();
-    boolean isComplete = tip.isComplete();
+    boolean tipIsComplete = tip.isComplete();
     boolean metricsDone = isComplete(); // job metrics garbage collected
     
     if (wasAttemptRunning) {
@@ -3018,14 +3015,24 @@ public class JobInProgress {
       // hence we are decrementing the same set.
       // Except after garbageCollect in a different thread.
       if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
+        boolean incWaiting = !tipIsComplete && !metricsDone &&
+            tip.getActiveTasks().isEmpty();
+        boolean wasSpeculative = wasComplete || !tip.getActiveTasks().isEmpty();
+
         if (tip.isMapTask() && !metricsDone) {
           runningMapTasks -= 1;
-          metrics.failedMap(taskid);
-          this.queueMetrics.failedMap(taskid);
+          metrics.failedMap(taskid, incWaiting);
+          this.queueMetrics.failedMap(taskid, incWaiting);
+          if (wasSpeculative) {
+            speculativeMapTasks--;
+          }
         } else if (!metricsDone) {
           runningReduceTasks -= 1;
-          metrics.failedReduce(taskid);
-          this.queueMetrics.failedReduce(taskid);
+          metrics.failedReduce(taskid, incWaiting);
+          this.queueMetrics.failedReduce(taskid, incWaiting);
+          if (wasSpeculative) {
+            speculativeReduceTasks--;
+          }
         }
       }
       
@@ -3042,14 +3049,14 @@ public class JobInProgress {
       } else if (tip.isMapTask()) {
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
-        if (!isComplete) {
+        if (!tipIsComplete) {
           retireMap(tip);
           failMap(tip);
         }
       } else {
         // remove from the running queue and put in the failed queue if the tip
         // is not complete
-        if (!isComplete) {
+        if (!tipIsComplete) {
           retireReduce(tip);
           failReduce(tip);
         }
@@ -3058,7 +3065,7 @@ public class JobInProgress {
         
     // The case when the map was complete but the task tracker went down.
     // However, we don't need to do any metering here...
-    if (wasComplete && !isComplete) {
+    if (wasComplete && !tipIsComplete) {
       if (tip.isMapTask()) {
         // Put the task back in the cache. This will help locality for cases
         // where we have a different TaskTracker from the same rack/switch

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Sat Jul 27 03:44:18 2013
@@ -32,22 +32,22 @@ class JobTrackerInstrumentation {
     tracker = jt;
   }
 
-  public void launchMap(TaskAttemptID taskAttemptID)
+  public void launchMap(TaskAttemptID taskAttemptID, boolean speculative)
   { }
 
   public void completeMap(TaskAttemptID taskAttemptID)
   { }
 
-  public void failedMap(TaskAttemptID taskAttemptID)
+  public void failedMap(TaskAttemptID taskAttemptID, boolean taskNowWaiting)
   { }
 
-  public void launchReduce(TaskAttemptID taskAttemptID)
+  public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative)
   { }
 
   public void completeReduce(TaskAttemptID taskAttemptID)
   { }
   
-  public void failedReduce(TaskAttemptID taskAttemptID)
+  public void failedReduce(TaskAttemptID taskAttemptID, boolean taskNowWaiting)
   { }
 
   public void submitJob(JobConf conf, JobID id) 

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsSource.java Sat Jul 27 03:44:18 2013
@@ -111,9 +111,11 @@ class JobTrackerMetricsSource extends Jo
   }
 
   @Override
-  public void launchMap(TaskAttemptID taskAttemptID) {
+  public void launchMap(TaskAttemptID taskAttemptID, boolean speculative) {
     mapsLaunched.incr();
-    decWaitingMaps(taskAttemptID.getJobID(), 1);
+    if (!speculative) {
+      decWaitingMaps(taskAttemptID.getJobID(), 1);
+    }
   }
 
   @Override
@@ -122,15 +124,19 @@ class JobTrackerMetricsSource extends Jo
   }
 
   @Override
-  public void failedMap(TaskAttemptID taskAttemptID) {
+  public void failedMap(TaskAttemptID taskAttemptID, boolean incWaiting) {
     mapsFailed.incr();
-    addWaitingMaps(taskAttemptID.getJobID(), 1);
+    if (incWaiting) {
+      addWaitingMaps(taskAttemptID.getJobID(), 1);
+    }
   }
 
   @Override
-  public void launchReduce(TaskAttemptID taskAttemptID) {
+  public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative) {
     redsLaunched.incr();
-    decWaitingReduces(taskAttemptID.getJobID(), 1);
+    if (!speculative) {
+      decWaitingReduces(taskAttemptID.getJobID(), 1);
+    }
   }
 
   @Override
@@ -139,9 +145,11 @@ class JobTrackerMetricsSource extends Jo
   }
 
   @Override
-  public void failedReduce(TaskAttemptID taskAttemptID) {
+  public void failedReduce(TaskAttemptID taskAttemptID, boolean incWaiting) {
     redsFailed.incr();
-    addWaitingReduces(taskAttemptID.getJobID(), 1);
+    if (incWaiting) {
+      addWaitingReduces(taskAttemptID.getJobID(), 1);
+    }
   }
 
   @Override

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Sat Jul 27 03:44:18 2013
@@ -219,7 +219,7 @@ public class LocalJobRunner implements J
           map.setConf(localConf);
           try {
             map_tasks.getAndIncrement();
-            myMetrics.launchMap(mapId);
+            myMetrics.launchMap(mapId, false);
             map.run(localConf, Job.this);
             myMetrics.completeMap(mapId);
           } finally {
@@ -393,8 +393,8 @@ public class LocalJobRunner implements J
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
-              myMetrics.launchReduce(reduce.getTaskID());
-              queueMetrics.launchReduce(reduce.getTaskID());
+              myMetrics.launchReduce(reduce.getTaskID(), false);
+              queueMetrics.launchReduce(reduce.getTaskID(), false);
               reduce.run(localConf, this);
               myMetrics.completeReduce(reduce.getTaskID());
               queueMetrics.completeReduce(reduce.getTaskID());

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java Sat Jul 27 03:44:18 2013
@@ -136,32 +136,40 @@ class QueueMetrics implements MetricsSou
     registry.snapshot(builder.addRecord(registry.name()), all);
   }
 
-  public void launchMap(TaskAttemptID taskAttemptID) {
+  public void launchMap(TaskAttemptID taskAttemptID, boolean speculative) {
     mapsLaunched.incr();
-    decWaitingMaps(taskAttemptID.getJobID(), 1);
+    if (!speculative) {
+      decWaitingMaps(taskAttemptID.getJobID(), 1);
+    }
   }
 
   public void completeMap(TaskAttemptID taskAttemptID) {
     mapsCompleted.incr();
   }
 
-  public void failedMap(TaskAttemptID taskAttemptID) {
+  public void failedMap(TaskAttemptID taskAttemptID, boolean incWaiting) {
     mapsFailed.incr();
-    addWaitingMaps(taskAttemptID.getJobID(), 1);
+    if (incWaiting) {
+      addWaitingMaps(taskAttemptID.getJobID(), 1);
+    }
   }
 
-  public void launchReduce(TaskAttemptID taskAttemptID) {
+  public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative) {
     redsLaunched.incr();
-    decWaitingReduces(taskAttemptID.getJobID(), 1);
+    if (!speculative) {
+      decWaitingReduces(taskAttemptID.getJobID(), 1);
+    }
   }
 
   public void completeReduce(TaskAttemptID taskAttemptID) {
     redsCompleted.incr();
   }
 
-  public void failedReduce(TaskAttemptID taskAttemptID) {
+  public void failedReduce(TaskAttemptID taskAttemptID, boolean incWaiting) {
     redsFailed.incr();
-    addWaitingReduces(taskAttemptID.getJobID(), 1);
+    if (incWaiting) {
+      addWaitingReduces(taskAttemptID.getJobID(), 1);
+    }
   }
 
   public void submitJob(JobConf conf, JobID id) {

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java?rev=1507569&r1=1507568&r2=1507569&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java Sat Jul 27 03:44:18 2013
@@ -54,21 +54,21 @@ public class TestQueueMetrics extends Te
     QueueMetrics metrics = QueueMetrics.create(queueName, new Configuration());
 
     assertEquals(metrics.getQueueName(), "single");
-    metrics.launchMap(taskAttemptID);
+    metrics.launchMap(taskAttemptID, false);
     checkMaps(metrics, 1, 0, 0, 0, -1, 0);
     metrics.addWaitingMaps(taskAttemptID.getJobID(), 5);
-    metrics.launchMap(taskAttemptID);
+    metrics.launchMap(taskAttemptID, false);
     checkMaps(metrics, 2, 0, 0, 0, 3, 0);
     checkReduces(metrics, 0, 0, 0, 0, 0, 0);
 
     metrics.completeMap(taskAttemptID);
-    metrics.failedMap(taskAttemptID);
+    metrics.failedMap(taskAttemptID, true);
     checkMaps(metrics, 2, 1, 1, 0, 4, 0);
     checkReduces(metrics, 0, 0, 0, 0, 0, 0);
 
-    metrics.launchReduce(taskAttemptID);
+    metrics.launchReduce(taskAttemptID, false);
     metrics.completeReduce(taskAttemptID);
-    metrics.failedReduce(taskAttemptID);
+    metrics.failedReduce(taskAttemptID, true);
     checkMaps(metrics, 2, 1, 1, 0, 4, 0);
     checkReduces(metrics, 1, 1, 1, 0, 0, 0);