You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/01 10:03:39 UTC

svn commit: r632572 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/TaskInProgress.java

Author: ddas
Date: Sat Mar  1 01:03:38 2008
New Revision: 632572

URL: http://svn.apache.org/viewvc?rev=632572&view=rev
Log:
HADOOP-2790.  Optimizes hasSpeculativeTask to do with getting the value of time (System.getCurrentTimeMillis()). This is now done only once in the beginning and is used for all TIPs. Contributed by Owen O'Malley.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632572&r1=632571&r2=632572&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar  1 01:03:38 2008
@@ -191,6 +191,10 @@
     HADOOP-2918.  Improve error logging so that dfs writes failure with
     "No lease on file" can be diagnosed. (dhruba)
 
+    HADOOP-2790.  Optimizes hasSpeculativeTask to do with getting the value of
+    time (System.getCurrentTimeMillis()). This is now done only once in the
+    beginning and is used for all TIPs. (Owen O'Malley via ddas).
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=632572&r1=632571&r2=632572&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Mar  1 01:03:38 2008
@@ -108,6 +108,8 @@
 
   private LocalFileSystem localFs;
   private String jobId;
+  private boolean hasSpeculativeMaps;
+  private boolean hasSpeculativeReduces;
 
   // Per-job counters
   public static enum Counter { 
@@ -179,6 +181,8 @@
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid);
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
   }
 
   /**
@@ -632,7 +636,7 @@
     
     
     int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, nodesToMaps);
+                             maps, nodesToMaps, hasSpeculativeMaps);
     if (target == -1) {
       return null;
     }
@@ -668,7 +672,7 @@
     }
 
     int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
-                             reduces, null);
+                             reduces, null, hasSpeculativeReduces);
     if (target == -1) {
       return null;
     }
@@ -754,10 +758,11 @@
     return trackerErrors;
   }
     
-  private boolean shouldRunSpeculativeTask(TaskInProgress task, 
-                                          double avgProgress,
-                                          String taskTracker) {
-    return task.hasSpeculativeTask(avgProgress) && 
+  private boolean shouldRunSpeculativeTask(long currentTime,
+                                           TaskInProgress task, 
+                                           double avgProgress,
+                                           String taskTracker) {
+    return task.hasSpeculativeTask(currentTime, avgProgress) && 
            !task.hasRunOnMachine(taskTracker);
   }
   
@@ -769,13 +774,15 @@
    * @param tasks The list of potential tasks to try
    * @param firstTaskToTry The first index in tasks to check
    * @param cachedTasks A list of tasks that would like to run on this node
+   * @param hasSpeculative Should it try to find speculative tasks
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private int findNewTask(TaskTrackerStatus tts, 
                           int clusterSize,
                           double avgProgress,
                           TaskInProgress[] tasks,
-                          Map<Node,List<TaskInProgress>> cachedTasks) {
+                          Map<Node,List<TaskInProgress>> cachedTasks,
+                          boolean hasSpeculative) {
     String taskTracker = tts.getTrackerName();
     int specTarget = -1;
 
@@ -799,6 +806,7 @@
       }
       return -1;
     }
+    long currentTime = System.currentTimeMillis();
         
     //
     // See if there is a split over a block that is stored on
@@ -845,8 +853,9 @@
               }
               return cacheTarget;
             }
-            if (specTarget == -1 &&
-                shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
+            if (hasSpeculative && specTarget == -1 &&
+                shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
+                                         taskTracker)) {
               specTarget = tip.getIdWithinJob();
             }
           }
@@ -881,8 +890,9 @@
           if (!isRunning) {
             LOG.info("Choosing normal task " + tasks[i].getTIPId());
             return i;
-          } else if (specTarget == -1 &&
-                     shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
+          } else if (hasSpeculative && specTarget == -1 &&
+                     shouldRunSpeculativeTask(currentTime, task, avgProgress,
+                                              taskTracker)) {
             specTarget = i;
           }
         }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=632572&r1=632571&r2=632572&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sat Mar  1 01:03:38 2008
@@ -96,7 +96,6 @@
   // currently runnings
   private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
   private JobConf conf;
-  private boolean runSpeculative;
   private Map<String,List<String>> taskDiagnosticData =
     new TreeMap<String,List<String>>();
   /**
@@ -131,7 +130,6 @@
     this.conf = conf;
     this.partition = partition;
     setMaxTaskAttempts();
-    this.runSpeculative = conf.getMapSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
         
@@ -149,7 +147,6 @@
     this.job = job;
     this.conf = conf;
     setMaxTaskAttempts();
-    this.runSpeculative = conf.getReduceSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
   
@@ -204,11 +201,6 @@
    */
   void init(String jobUniqueString) {
     this.startTime = System.currentTimeMillis();
-    if ("true".equals(conf.get("mapred.speculative.execution"))) {
-      this.runSpeculative = true;
-    } else if ("false".equals(conf.get("mapred.speculative.execution"))) {
-      this.runSpeculative = false;
-    }
     this.taskIdPrefix = makeUniqueString(jobUniqueString);
     this.id = "tip_" + this.taskIdPrefix;
   }
@@ -691,16 +683,15 @@
    * far behind, and has been behind for a non-trivial amount of 
    * time.
    */
-  boolean hasSpeculativeTask(double averageProgress) {
+  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
     //
     // REMIND - mjc - these constants should be examined
     // in more depth eventually...
     //
       
     if (activeTasks.size() <= MAX_TASK_EXECS &&
-        runSpeculative &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
-        (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
+        (currentTime - startTime >= SPECULATIVE_LAG) 
         && completes == 0 && !isOnlyCommitPending()) {
       return true;
     }