You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/01 10:06:55 UTC

svn commit: r632573 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/TaskInProgress.java

Author: ddas
Date: Sat Mar  1 01:06:54 2008
New Revision: 632573

URL: http://svn.apache.org/viewvc?rev=632573&view=rev
Log:
HADOOP-2790. Reverted patch due to conflicts in 0.16

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632573&r1=632572&r2=632573&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar  1 01:06:54 2008
@@ -191,10 +191,6 @@
     HADOOP-2918.  Improve error logging so that dfs writes failure with
     "No lease on file" can be diagnosed. (dhruba)
 
-    HADOOP-2790.  Optimizes hasSpeculativeTask to do with getting the value of
-    time (System.getCurrentTimeMillis()). This is now done only once in the
-    beginning and is used for all TIPs. (Owen O'Malley via ddas).
-
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=632573&r1=632572&r2=632573&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Mar  1 01:06:54 2008
@@ -108,8 +108,6 @@
 
   private LocalFileSystem localFs;
   private String jobId;
-  private boolean hasSpeculativeMaps;
-  private boolean hasSpeculativeReduces;
 
   // Per-job counters
   public static enum Counter { 
@@ -181,8 +179,6 @@
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid);
-    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
   }
 
   /**
@@ -636,7 +632,7 @@
     
     
     int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, nodesToMaps, hasSpeculativeMaps);
+                             maps, nodesToMaps);
     if (target == -1) {
       return null;
     }
@@ -672,7 +668,7 @@
     }
 
     int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
-                             reduces, null, hasSpeculativeReduces);
+                             reduces, null);
     if (target == -1) {
       return null;
     }
@@ -758,11 +754,10 @@
     return trackerErrors;
   }
     
-  private boolean shouldRunSpeculativeTask(long currentTime,
-                                           TaskInProgress task, 
-                                           double avgProgress,
-                                           String taskTracker) {
-    return task.hasSpeculativeTask(currentTime, avgProgress) && 
+  private boolean shouldRunSpeculativeTask(TaskInProgress task, 
+                                          double avgProgress,
+                                          String taskTracker) {
+    return task.hasSpeculativeTask(avgProgress) && 
            !task.hasRunOnMachine(taskTracker);
   }
   
@@ -774,15 +769,13 @@
    * @param tasks The list of potential tasks to try
    * @param firstTaskToTry The first index in tasks to check
    * @param cachedTasks A list of tasks that would like to run on this node
-   * @param hasSpeculative Should it try to find speculative tasks
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private int findNewTask(TaskTrackerStatus tts, 
                           int clusterSize,
                           double avgProgress,
                           TaskInProgress[] tasks,
-                          Map<Node,List<TaskInProgress>> cachedTasks,
-                          boolean hasSpeculative) {
+                          Map<Node,List<TaskInProgress>> cachedTasks) {
     String taskTracker = tts.getTrackerName();
     int specTarget = -1;
 
@@ -806,7 +799,6 @@
       }
       return -1;
     }
-    long currentTime = System.currentTimeMillis();
         
     //
     // See if there is a split over a block that is stored on
@@ -853,9 +845,8 @@
               }
               return cacheTarget;
             }
-            if (hasSpeculative && specTarget == -1 &&
-                shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
-                                         taskTracker)) {
+            if (specTarget == -1 &&
+                shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
               specTarget = tip.getIdWithinJob();
             }
           }
@@ -890,9 +881,8 @@
           if (!isRunning) {
             LOG.info("Choosing normal task " + tasks[i].getTIPId());
             return i;
-          } else if (hasSpeculative && specTarget == -1 &&
-                     shouldRunSpeculativeTask(currentTime, task, avgProgress,
-                                              taskTracker)) {
+          } else if (specTarget == -1 &&
+                     shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
             specTarget = i;
           }
         }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=632573&r1=632572&r2=632573&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sat Mar  1 01:06:54 2008
@@ -96,6 +96,7 @@
   // currently runnings
   private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
   private JobConf conf;
+  private boolean runSpeculative;
   private Map<String,List<String>> taskDiagnosticData =
     new TreeMap<String,List<String>>();
   /**
@@ -130,6 +131,7 @@
     this.conf = conf;
     this.partition = partition;
     setMaxTaskAttempts();
+    this.runSpeculative = conf.getMapSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
         
@@ -147,6 +149,7 @@
     this.job = job;
     this.conf = conf;
     setMaxTaskAttempts();
+    this.runSpeculative = conf.getReduceSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
   
@@ -201,6 +204,11 @@
    */
   void init(String jobUniqueString) {
     this.startTime = System.currentTimeMillis();
+    if ("true".equals(conf.get("mapred.speculative.execution"))) {
+      this.runSpeculative = true;
+    } else if ("false".equals(conf.get("mapred.speculative.execution"))) {
+      this.runSpeculative = false;
+    }
     this.taskIdPrefix = makeUniqueString(jobUniqueString);
     this.id = "tip_" + this.taskIdPrefix;
   }
@@ -683,15 +691,16 @@
    * far behind, and has been behind for a non-trivial amount of 
    * time.
    */
-  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
+  boolean hasSpeculativeTask(double averageProgress) {
     //
     // REMIND - mjc - these constants should be examined
     // in more depth eventually...
     //
       
     if (activeTasks.size() <= MAX_TASK_EXECS &&
+        runSpeculative &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
-        (currentTime - startTime >= SPECULATIVE_LAG) 
+        (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
         && completes == 0 && !isOnlyCommitPending()) {
       return true;
     }