You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2018/07/27 14:56:32 UTC

tez git commit: TEZ-3934. LegacySpeculator sometime issues wrong number of speculative attempts (Nishant Dash via jeagles)

Repository: tez
Updated Branches:
  refs/heads/master dd6a09dc4 -> fe22f3276


TEZ-3934. LegacySpeculator sometime issues wrong number of speculative attempts (Nishant Dash via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fe22f327
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fe22f327
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fe22f327

Branch: refs/heads/master
Commit: fe22f3276d6d97f6b5dfab24490ee2ca32bf73c3
Parents: dd6a09d
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Jul 27 09:56:10 2018 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Jul 27 09:56:10 2018 -0500

----------------------------------------------------------------------
 .../speculation/legacy/LegacySpeculator.java    | 54 ++++++++++++--------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fe22f327/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index 9fbea19..c132fb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -85,8 +85,7 @@ public class LegacySpeculator {
   // in progress.
   private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
 
-
-  private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+  private final Set<TezTaskID> waitingToSpeculate = new HashSet<TezTaskID>();
 
   private Vertex vertex;
   private TaskRuntimeEstimator estimator;
@@ -229,24 +228,44 @@ public class LegacySpeculator {
     if (task.getState() == TaskState.SUCCEEDED) {
       return NOT_RUNNING;
     }
-    
-    if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) {
-      acceptableRuntime = estimator.thresholdRuntime(taskID);
-      if (acceptableRuntime == Long.MAX_VALUE) {
-        return ON_SCHEDULE;
-      }
-    }
-
-    TezTaskAttemptID runningTaskAttemptID = null;
 
     int numberRunningAttempts = 0;
 
     for (TaskAttempt taskAttempt : attempts.values()) {
-      if (taskAttempt.getState() == TaskAttemptState.RUNNING
-          || taskAttempt.getState() == TaskAttemptState.STARTING) {
+      TaskAttemptState taskAttemptState = taskAttempt.getState();
+      if (taskAttemptState == TaskAttemptState.RUNNING
+          || taskAttemptState == TaskAttemptState.STARTING) {
         if (++numberRunningAttempts > 1) {
+          waitingToSpeculate.remove(taskID);
           return ALREADY_SPECULATING;
         }
+      }
+    }
+
+    // If we are here, there's at most one task attempt.
+    if (numberRunningAttempts == 0) {
+      return NOT_RUNNING;
+    }
+
+    if ((numberRunningAttempts == 1) && waitingToSpeculate.contains(taskID)) {
+      return ALREADY_SPECULATING;
+    }
+    else {
+      if (!shouldUseTimeout) {
+        acceptableRuntime = estimator.thresholdRuntime(taskID);
+        if (acceptableRuntime == Long.MAX_VALUE) {
+          return ON_SCHEDULE;
+        }
+      }
+    }
+
+    TezTaskAttemptID runningTaskAttemptID = null;
+
+    for (TaskAttempt taskAttempt : attempts.values()) {
+      TaskAttemptState taskAttemptState = taskAttempt.getState();
+      if (taskAttemptState == TaskAttemptState.RUNNING
+          || taskAttemptState == TaskAttemptState.STARTING) {
+
         runningTaskAttemptID = taskAttempt.getID();
 
         long taskAttemptStartTime
@@ -311,13 +330,6 @@ public class LegacySpeculator {
       }
     }
 
-    // If we are here, there's at most one task attempt.
-    if (numberRunningAttempts == 0) {
-      return NOT_RUNNING;
-    }
-
-
-
     if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) {
       acceptableRuntime = estimator.thresholdRuntime(taskID);
       if (acceptableRuntime == Long.MAX_VALUE) {
@@ -332,7 +344,7 @@ public class LegacySpeculator {
   protected void addSpeculativeAttempt(TezTaskID taskID) {
     LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
     vertex.scheduleSpeculativeTask(taskID);
-    mayHaveSpeculated.add(taskID);
+    waitingToSpeculate.add(taskID);
   }
 
   private int maybeScheduleASpeculation() {