You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/03/02 09:38:31 UTC

svn commit: r632722 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/TaskInProgress.java

Author: omalley
Date: Sun Mar  2 00:38:28 2008
New Revision: 632722

URL: http://svn.apache.org/viewvc?rev=632722&view=rev
Log:
HADOOP-2790. Fixed inefficient method hasSpeculativeTask by removing
repetitive calls to get the current time and late checking to see if
we want speculation on at all.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632722&r1=632721&r2=632722&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sun Mar  2 00:38:28 2008
@@ -52,6 +52,10 @@
 
   OPTIMIZATIONS
 
+    HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing
+    repetitive calls to get the current time and late checking to see if
+    we want speculation on at all. (omalley)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=632722&r1=632721&r2=632722&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sun Mar  2 00:38:28 2008
@@ -108,6 +108,8 @@
 
   private LocalFileSystem localFs;
   private String jobId;
+  private boolean hasSpeculativeMaps;
+  private boolean hasSpeculativeReduces;
 
   // Per-job counters
   public static enum Counter { 
@@ -179,6 +181,8 @@
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid);
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
   }
 
   /**
@@ -632,7 +636,7 @@
     
     
     int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, nodesToMaps);
+                             maps, nodesToMaps, hasSpeculativeMaps);
     if (target == -1) {
       return null;
     }
@@ -668,7 +672,7 @@
     }
 
     int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
-                             reduces, null);
+                             reduces, null, hasSpeculativeReduces);
     if (target == -1) {
       return null;
     }
@@ -754,10 +758,11 @@
     return trackerErrors;
   }
     
-  private boolean shouldRunSpeculativeTask(TaskInProgress task, 
-                                          double avgProgress,
-                                          String taskTracker) {
-    return task.hasSpeculativeTask(avgProgress) && 
+  private boolean shouldRunSpeculativeTask(long currentTime,
+                                           TaskInProgress task, 
+                                           double avgProgress,
+                                           String taskTracker) {
+    return task.hasSpeculativeTask(currentTime, avgProgress) && 
            !task.hasRunOnMachine(taskTracker);
   }
   
@@ -769,13 +774,15 @@
    * @param tasks The list of potential tasks to try
    * @param firstTaskToTry The first index in tasks to check
    * @param cachedTasks A list of tasks that would like to run on this node
+   * @param hasSpeculative Should it try to find speculative tasks
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private int findNewTask(TaskTrackerStatus tts, 
                           int clusterSize,
                           double avgProgress,
                           TaskInProgress[] tasks,
-                          Map<Node,List<TaskInProgress>> cachedTasks) {
+                          Map<Node,List<TaskInProgress>> cachedTasks,
+                          boolean hasSpeculative) {
     String taskTracker = tts.getTrackerName();
     int specTarget = -1;
 
@@ -799,6 +806,7 @@
       }
       return -1;
     }
+    long currentTime = System.currentTimeMillis();
         
     //
     // See if there is a split over a block that is stored on
@@ -845,8 +853,9 @@
               }
               return cacheTarget;
             }
-            if (specTarget == -1 &&
-                shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
+            if (hasSpeculative && specTarget == -1 &&
+                shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
+                                         taskTracker)) {
               specTarget = tip.getIdWithinJob();
             }
           }
@@ -881,8 +890,9 @@
           if (!isRunning) {
             LOG.info("Choosing normal task " + tasks[i].getTIPId());
             return i;
-          } else if (specTarget == -1 &&
-                     shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
+          } else if (hasSpeculative && specTarget == -1 &&
+                     shouldRunSpeculativeTask(currentTime, task, avgProgress,
+                                              taskTracker)) {
             specTarget = i;
           }
         }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=632722&r1=632721&r2=632722&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sun Mar  2 00:38:28 2008
@@ -96,7 +96,6 @@
   // currently runnings
   private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
   private JobConf conf;
-  private boolean runSpeculative;
   private Map<String,List<String>> taskDiagnosticData =
     new TreeMap<String,List<String>>();
   /**
@@ -131,7 +130,6 @@
     this.conf = conf;
     this.partition = partition;
     setMaxTaskAttempts();
-    this.runSpeculative = conf.getMapSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
         
@@ -149,7 +147,6 @@
     this.job = job;
     this.conf = conf;
     setMaxTaskAttempts();
-    this.runSpeculative = conf.getReduceSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
   
@@ -204,11 +201,6 @@
    */
   void init(String jobUniqueString) {
     this.startTime = System.currentTimeMillis();
-    if ("true".equals(conf.get("mapred.speculative.execution"))) {
-      this.runSpeculative = true;
-    } else if ("false".equals(conf.get("mapred.speculative.execution"))) {
-      this.runSpeculative = false;
-    }
     this.taskIdPrefix = makeUniqueString(jobUniqueString);
     this.id = "tip_" + this.taskIdPrefix;
   }
@@ -691,16 +683,15 @@
    * far behind, and has been behind for a non-trivial amount of 
    * time.
    */
-  boolean hasSpeculativeTask(double averageProgress) {
+  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
     //
     // REMIND - mjc - these constants should be examined
     // in more depth eventually...
     //
       
     if (activeTasks.size() <= MAX_TASK_EXECS &&
-        runSpeculative &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
-        (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
+        (currentTime - startTime >= SPECULATIVE_LAG) 
         && completes == 0 && !isOnlyCommitPending()) {
       return true;
     }