You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2018/07/27 14:56:32 UTC
tez git commit: TEZ-3934. LegacySpeculator sometime issues wrong
number of speculative attempts (Nishant Dash via jeagles)
Repository: tez
Updated Branches:
refs/heads/master dd6a09dc4 -> fe22f3276
TEZ-3934. LegacySpeculator sometime issues wrong number of speculative attempts (Nishant Dash via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fe22f327
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fe22f327
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fe22f327
Branch: refs/heads/master
Commit: fe22f3276d6d97f6b5dfab24490ee2ca32bf73c3
Parents: dd6a09d
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Fri Jul 27 09:56:10 2018 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Fri Jul 27 09:56:10 2018 -0500
----------------------------------------------------------------------
.../speculation/legacy/LegacySpeculator.java | 54 ++++++++++++--------
1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fe22f327/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
index 9fbea19..c132fb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -85,8 +85,7 @@ public class LegacySpeculator {
// in progress.
private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
-
- private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+ private final Set<TezTaskID> waitingToSpeculate = new HashSet<TezTaskID>();
private Vertex vertex;
private TaskRuntimeEstimator estimator;
@@ -229,24 +228,44 @@ public class LegacySpeculator {
if (task.getState() == TaskState.SUCCEEDED) {
return NOT_RUNNING;
}
-
- if (!mayHaveSpeculated.contains(taskID) && !shouldUseTimeout) {
- acceptableRuntime = estimator.thresholdRuntime(taskID);
- if (acceptableRuntime == Long.MAX_VALUE) {
- return ON_SCHEDULE;
- }
- }
-
- TezTaskAttemptID runningTaskAttemptID = null;
int numberRunningAttempts = 0;
for (TaskAttempt taskAttempt : attempts.values()) {
- if (taskAttempt.getState() == TaskAttemptState.RUNNING
- || taskAttempt.getState() == TaskAttemptState.STARTING) {
+ TaskAttemptState taskAttemptState = taskAttempt.getState();
+ if (taskAttemptState == TaskAttemptState.RUNNING
+ || taskAttemptState == TaskAttemptState.STARTING) {
if (++numberRunningAttempts > 1) {
+ waitingToSpeculate.remove(taskID);
return ALREADY_SPECULATING;
}
+ }
+ }
+
+ // If we are here, there's at most one task attempt.
+ if (numberRunningAttempts == 0) {
+ return NOT_RUNNING;
+ }
+
+ if ((numberRunningAttempts == 1) && waitingToSpeculate.contains(taskID)) {
+ return ALREADY_SPECULATING;
+ }
+ else {
+ if (!shouldUseTimeout) {
+ acceptableRuntime = estimator.thresholdRuntime(taskID);
+ if (acceptableRuntime == Long.MAX_VALUE) {
+ return ON_SCHEDULE;
+ }
+ }
+ }
+
+ TezTaskAttemptID runningTaskAttemptID = null;
+
+ for (TaskAttempt taskAttempt : attempts.values()) {
+ TaskAttemptState taskAttemptState = taskAttempt.getState();
+ if (taskAttemptState == TaskAttemptState.RUNNING
+ || taskAttemptState == TaskAttemptState.STARTING) {
+
runningTaskAttemptID = taskAttempt.getID();
long taskAttemptStartTime
@@ -311,13 +330,6 @@ public class LegacySpeculator {
}
}
- // If we are here, there's at most one task attempt.
- if (numberRunningAttempts == 0) {
- return NOT_RUNNING;
- }
-
-
-
if ((acceptableRuntime == Long.MIN_VALUE) && !shouldUseTimeout) {
acceptableRuntime = estimator.thresholdRuntime(taskID);
if (acceptableRuntime == Long.MAX_VALUE) {
@@ -332,7 +344,7 @@ public class LegacySpeculator {
protected void addSpeculativeAttempt(TezTaskID taskID) {
LOG.info("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
vertex.scheduleSpeculativeTask(taskID);
- mayHaveSpeculated.add(taskID);
+ waitingToSpeculate.add(taskID);
}
private int maybeScheduleASpeculation() {