You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/11/19 20:44:34 UTC
[2/2] tez git commit: TEZ-14. Support MR like speculation
capabilities based on latency deviation from the mean (bikas)
TEZ-14. Support MR like speculation capabilities based on latency deviation from the mean (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6be75661
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6be75661
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6be75661
Branch: refs/heads/master
Commit: 6be7566142a657a3dbcfc262af9c55546da45728
Parents: c56bb01
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Nov 19 11:44:27 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Nov 19 11:44:27 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/TezConfiguration.java | 16 +-
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 6 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 1 +
.../dag/event/TaskAttemptEventKillRequest.java | 7 +-
.../dag/event/TaskAttemptEventStatusUpdate.java | 53 ---
.../VertexEventTaskAttemptStatusUpdate.java | 60 +++
.../tez/dag/app/dag/event/VertexEventType.java | 3 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 5 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 31 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 23 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 61 ++-
.../dag/speculation/legacy/DataStatistics.java | 86 ++++
.../speculation/legacy/LegacySpeculator.java | 396 +++++++++++++++++++
.../legacy/LegacyTaskRuntimeEstimator.java | 136 +++++++
.../speculation/legacy/StartEndTimesBase.java | 138 +++++++
.../legacy/TaskRuntimeEstimator.java | 91 +++++
.../java/org/apache/tez/dag/app/MockClock.java | 36 ++
.../apache/tez/dag/app/MockDAGAppMaster.java | 36 +-
.../org/apache/tez/dag/app/MockLocalClient.java | 6 +-
.../org/apache/tez/dag/app/MockTezClient.java | 5 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 8 +-
.../org/apache/tez/dag/app/TestPreemption.java | 4 +-
.../org/apache/tez/dag/app/TestSpeculation.java | 161 ++++++++
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 195 ++++++++-
.../speculation/legacy/TestDataStatistics.java | 73 ++++
.../org/apache/tez/test/TestDAGRecovery.java | 2 +-
.../apache/tez/test/dag/MultiAttemptDAG.java | 1 +
28 files changed, 1557 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3c8c676..2eaf873 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.6.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-14. Support MR like speculation capabilities based on latency deviation
+ from the mean
TEZ-1733. TezMerger should sort FileChunks on size when merging
TEZ-1738. Tez tfile parser for log parsing
TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 6873863..84ee906 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -272,7 +272,21 @@ public class TezConfiguration extends Configuration {
TEZ_PREFIX + "counters.group-name.max-length";
public static final int TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH_DEFAULT = 128;
-
+ @Unstable
+ /**
+ * Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency
+ * when some tasks are running slower due bad/slow machines
+ */
+ public static final String TEZ_AM_SPECULATION_ENABLED = TEZ_AM_PREFIX + "speculation.enabled";
+ public static final boolean TEZ_AM_SPECULATION_ENABLED_DEFAULT = false;
+
+ /**
+ * Float value. Specifies how many standard deviations away from the mean task execution time
+ * should be considered as an outlier/slow task.
+ */
+ @Unstable
+ public static final String TEZ_AM_LEGACY_SPECULATIVE_SLOWTASK_THRESHOLD =
+ TEZ_AM_PREFIX + "legacy.speculative.slowtask.threshold";
/**
* Int value. Upper limit on the number of threads user to launch containers in the app
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index f30fc5c..4aa220d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -40,10 +40,14 @@ import org.apache.tez.dag.records.TezVertexID;
public interface TaskAttempt {
public static class TaskAttemptStatus {
+ public TezTaskAttemptID id;
public TaskAttemptState state;
- public DAGCounter localityCounter;
public float progress;
public TezCounters counters;
+
+ public TaskAttemptStatus(TezTaskAttemptID id) {
+ this.id = id;
+ }
// insert these counters till they come natively from the task itself.
// HDFS-5098
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index cfedc41..7487fd9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -124,6 +124,7 @@ public interface Vertex extends Comparable<Vertex> {
int getInputVerticesCount();
int getOutputVerticesCount();
void scheduleTasks(List<TaskWithLocationHint> tasks);
+ void scheduleSpeculativeTask(TezTaskID taskId);
Resource getTaskResource();
ProcessorDescriptor getProcessorDescriptor();
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
index 9bceb1d..0205fcf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
@@ -19,7 +19,7 @@ package org.apache.tez.dag.app.dag.event;
import org.apache.tez.dag.records.TezTaskAttemptID;
-public class TaskAttemptEventKillRequest extends TaskAttemptEvent {
+public class TaskAttemptEventKillRequest extends TaskAttemptEvent implements DiagnosableEvent {
private final String message;
@@ -28,8 +28,9 @@ public class TaskAttemptEventKillRequest extends TaskAttemptEvent {
this.message = message;
}
- public String getMessage() {
- return this.message;
+ @Override
+ public String getDiagnosticInfo() {
+ return message;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index 13577c5..c5a6ea7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -18,12 +18,6 @@
package org.apache.tez.dag.app.dag.event;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.tez.common.counters.DAGCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -40,51 +34,4 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
public TaskStatusUpdateEvent getStatusEvent() {
return this.taskAttemptStatus;
}
-
- private TaskAttemptStatusOld reportedTaskAttemptStatus;
-
- public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
- TaskAttemptStatusOld taskAttemptStatus) {
- super(id, TaskAttemptEventType.TA_STATUS_UPDATE);
- this.reportedTaskAttemptStatus = taskAttemptStatus;
- }
-
- public TaskAttemptStatusOld getReportedTaskAttemptStatus() {
- return reportedTaskAttemptStatus;
- }
-
- /**
- * The internal TaskAttemptStatus object corresponding to remote Task status.
- *
- */
- public static class TaskAttemptStatusOld {
-
- private AtomicBoolean localitySet = new AtomicBoolean(false);
-
- public TezTaskAttemptID id;
- public float progress;
- public TezCounters counters;
- public String stateString;
- //public Phase phase;
- public long outputSize;
- public List<TezTaskAttemptID> fetchFailedMaps;
- public long mapFinishTime;
- public long shuffleFinishTime;
- public long sortFinishTime;
- public TaskAttemptState taskState;
-
- public void setLocalityCounter(DAGCounter localityCounter) {
- if (!localitySet.get()) {
- localitySet.set(true);
- if (counters == null) {
- counters = new TezCounters();
- }
- if (localityCounter != null) {
- counters.findCounter(localityCounter).increment(1);
- // TODO Maybe validate that the correct value is being set.
- }
- }
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
new file mode 100644
index 0000000..696680d
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptStatusUpdate.java
@@ -0,0 +1,60 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class VertexEventTaskAttemptStatusUpdate extends VertexEvent {
+ final TezTaskAttemptID id;
+ final TaskAttemptState state;
+ final long timestamp;
+ final boolean justStarted;
+
+ public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
+ long timestamp) {
+ this(taId, state, timestamp, false);
+ }
+
+ public VertexEventTaskAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState state,
+ long timestamp, boolean justStarted) {
+ super(taId.getTaskID().getVertexID(), VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE);
+ this.id = taId;
+ this.state = state;
+ this.timestamp = timestamp;
+ this.justStarted = justStarted;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public TezTaskAttemptID getAttemptId() {
+ return id;
+ }
+
+ public boolean hasJustStarted() {
+ return justStarted;
+ }
+
+ public TaskAttemptState getTaskAttemptState() {
+ return state;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index b4f7e29..5565f93 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -40,6 +40,9 @@ public enum VertexEventType {
V_TASK_RESCHEDULED,
V_TASK_ATTEMPT_COMPLETED,
+ //Producer:TaskAttempt
+ V_TASK_ATTEMPT_STATUS_UPDATE,
+
//Producer:Any component
V_INTERNAL_ERROR,
V_MANAGER_USER_CODE_ERROR,
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index de62752..6e7805e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -111,6 +111,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.dag.utils.RelocalizationUtils;
@@ -768,6 +769,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
.getAttempt(taId);
}
+ public TaskImpl getTask(TezTaskID tId) {
+ return (TaskImpl) getVertex(tId.getVertexID()).getTask(tId);
+ }
+
protected void initializeVerticesAndStart() {
for (Vertex v : vertices.values()) {
if (v.getInputVerticesCount() == 0) {
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index deaba42..3056c1e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
@@ -83,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -413,7 +415,7 @@ public class TaskAttemptImpl implements TaskAttempt,
this.clock = clock;
this.taskHeartbeatHandler = taskHeartbeatHandler;
this.appContext = appContext;
- this.reportedStatus = new TaskAttemptStatus();
+ this.reportedStatus = new TaskAttemptStatus(this.attemptId);
initTaskAttemptStatus(reportedStatus);
RackResolver.init(conf);
this.stateMachine = stateMachineFactory.make(this);
@@ -1151,10 +1153,20 @@ public class TaskAttemptImpl implements TaskAttempt,
// Inform the Task
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
TaskEventType.T_ATTEMPT_LAUNCHED));
+
+ if (ta.isSpeculationEnabled()) {
+ ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
+ ta.launchTime, true));
+ }
ta.taskHeartbeatHandler.register(ta.attemptId);
}
}
+
+ private boolean isSpeculationEnabled() {
+ return conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
+ TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
+ }
protected static class TerminatedBeforeRunningTransition extends
TerminateTransition {
@@ -1235,6 +1247,10 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.updateProgressSplits();
+ if (ta.isSpeculationEnabled()) {
+ ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, ta.getState(),
+ ta.clock.getTime()));
+ }
}
}
@@ -1259,6 +1275,14 @@ public class TaskAttemptImpl implements TaskAttempt,
// Unregister from the TaskHeartbeatHandler.
ta.taskHeartbeatHandler.unregister(ta.attemptId);
+
+ ta.reportedStatus.state = TaskAttemptState.SUCCEEDED;
+ ta.reportedStatus.progress = 1.0f;
+
+ if (ta.isSpeculationEnabled()) {
+ ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.SUCCEEDED,
+ ta.clock.getTime()));
+ }
// TODO maybe. For reuse ... Stacking pulls for a reduce task, even if the
// TA finishes independently. // Will likely be the Job's responsibility.
@@ -1278,6 +1302,11 @@ public class TaskAttemptImpl implements TaskAttempt,
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
ta.taskHeartbeatHandler.unregister(ta.attemptId);
+ ta.reportedStatus.state = helper.getTaskAttemptState(); // FAILED or KILLED
+ if (ta.isSpeculationEnabled()) {
+ ta.sendEvent(new VertexEventTaskAttemptStatusUpdate(ta.attemptId, helper.getTaskAttemptState(),
+ ta.clock.getTime()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4ded9be..c3ba11d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -1016,6 +1015,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (task.historyTaskStartGenerated) {
task.logJobHistoryTaskFinishedEvent();
}
+ TaskAttempt successfulAttempt = task.attempts.get(successTaId);
// issue kill to all other attempts
for (TaskAttempt attempt : task.attempts.values()) {
@@ -1024,9 +1024,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// TA_KILL message to an attempt that doesn't need one for
// other reasons.
!attempt.isFinished()) {
- LOG.info("Issuing kill to other attempt " + attempt.getID());
+ LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
+ task.successfulAttempt + " has succeeded");
+ String diagnostics = null;
+ if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
+ diagnostics = "Killed this attempt as other speculative attempt : " + successTaId
+ + " succeeded";
+ } else {
+ diagnostics = "Killed this speculative attempt as original attempt: " + successTaId
+ + " succeeded";
+ }
task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt
- .getID(), "Alternate attempt succeeded"));
+ .getID(), diagnostics));
}
}
// send notification to DAG scheduler
@@ -1336,12 +1345,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
- // verify that this occurs only for map task
- // TODO: consider moving it to MapTaskImpl
- if (task.leafVertex) {
- LOG.error("Unexpected event for task of leaf vertex " + event.getType());
- task.internalError(event.getType());
- }
TaskEventTAUpdate attemptEvent = (TaskEventTAUpdate) event;
TezTaskAttemptID attemptId = attemptEvent.getTaskAttemptID();
@@ -1365,6 +1368,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
return TaskStateInternal.SCHEDULED;
} else {
// nothing to do
+ LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
+ task.successfulAttempt + " is already successful");
return TaskStateInternal.SUCCEEDED;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index d19c4cc..54cd6c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -68,6 +68,7 @@ import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
@@ -121,11 +122,13 @@ import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
+import org.apache.tez.dag.app.dag.speculation.legacy.LegacySpeculator;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
@@ -205,6 +208,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Resource taskResource;
private Configuration conf;
+
+ private final boolean isSpeculationEnabled;
//fields initialized in init
@@ -235,6 +240,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private static final TaskAttemptCompletedEventTransition
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new TaskAttemptCompletedEventTransition();
+ private static final TaskAttempStatusUpdateEventTransition
+ TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION = new TaskAttempStatusUpdateEventTransition();
private static final SourceTaskAttemptCompletedEventTransition
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new SourceTaskAttemptCompletedEventTransition();
@@ -248,6 +255,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@VisibleForTesting
final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>();
+
+ LegacySpeculator speculator;
protected static final
StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
@@ -460,6 +469,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexState.RUNNING, VertexState.TERMINATING),
VertexEventType.V_ROUTE_EVENT,
ROUTE_EVENT_TRANSITION)
+ .addTransition(
+ VertexState.RUNNING,
+ VertexState.RUNNING, VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
+ TASK_ATTEMPT_STATUS_UPDATE_EVENT_TRANSITION)
// Transitions from TERMINATING state.
.addTransition
@@ -477,6 +490,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_MANAGER_USER_CODE_ERROR,
VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_SOURCE_VERTEX_STARTED,
+ VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
VertexEventType.V_NULL_EDGE_INITIALIZED,
VertexEventType.V_ROUTE_EVENT,
@@ -494,7 +508,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_TASK_RESCHEDULED,
new TaskRescheduledAfterVertexSuccessTransition())
- // Ignore-able events
.addTransition(
VertexState.SUCCEEDED,
EnumSet.of(VertexState.SUCCEEDED, VertexState.FAILED),
@@ -506,10 +519,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EnumSet.of(VertexState.FAILED, VertexState.ERROR),
VertexEventType.V_TASK_COMPLETED,
new TaskCompletedAfterVertexSuccessTransition())
+ // Ignore-able events
.addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
EnumSet.of(VertexEventType.V_TERMINATE,
VertexEventType.V_ROOT_INPUT_FAILED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+ VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
// after we are done reruns of source tasks should not affect
// us. These reruns may be triggered by other consumer vertices.
// We should have been in RUNNING state if we had triggered the
@@ -519,6 +534,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
new TaskAttemptCompletedEventTransition())
+
// Transitions from FAILED state
.addTransition(
VertexState.FAILED,
@@ -534,6 +550,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_START,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+ VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -558,6 +575,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TASK_RESCHEDULED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+ VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED,
@@ -577,6 +595,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventType.V_ROUTE_EVENT,
VertexEventType.V_TERMINATE,
VertexEventType.V_MANAGER_USER_CODE_ERROR,
+ VertexEventType.V_TASK_ATTEMPT_STATUS_UPDATE,
VertexEventType.V_TASK_COMPLETED,
VertexEventType.V_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
@@ -773,7 +792,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Not sending the notifier a parallelism update since this is the initial parallelism
this.dagVertexGroups = dagVertexGroups;
-
+
+ isSpeculationEnabled = conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
+ TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
+
+ if (isSpeculationEnabled()) {
+ speculator = new LegacySpeculator(conf, getAppContext(), this);
+ }
+
logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
// This "this leak" is okay because the retained pointer is in an
// instance variable.
@@ -782,6 +808,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
stateMachineFactory.make(this), this);
augmentStateMachine();
}
+
+ private boolean isSpeculationEnabled() {
+ return isSpeculationEnabled;
+ }
protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
return stateMachine;
@@ -1194,6 +1224,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
+ public void scheduleSpeculativeTask(TezTaskID taskId) {
+ Preconditions.checkState(taskId.getId() < numTasks);
+ eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_ADD_SPEC_ATTEMPT));
+ }
+
+ @Override
public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
writeLock.lock();
try {
@@ -3282,6 +3318,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
eventHandler.handle(new VertexEvent(
this.vertexId, VertexEventType.V_COMPLETED));
}
+
return VertexState.RUNNING;
}
@@ -3483,7 +3520,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
LOG.error(msg, e);
vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
- vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE, TaskTerminationCause.AM_USERCODE_FAILURE);
+ vertex.tryEnactKill(VertexTerminationCause.AM_USERCODE_FAILURE,
+ TaskTerminationCause.AM_USERCODE_FAILURE);
return VertexState.TERMINATING;
}
} else {
@@ -3515,6 +3553,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
+ private static class TaskAttempStatusUpdateEventTransition implements
+ SingleArcTransition<VertexImpl, VertexEvent> {
+ @Override
+ public void transition(VertexImpl vertex, VertexEvent event) {
+ VertexEventTaskAttemptStatusUpdate updateEvent =
+ ((VertexEventTaskAttemptStatusUpdate) event);
+ if (vertex.isSpeculationEnabled()) {
+ if (updateEvent.hasJustStarted()) {
+ vertex.speculator.notifyAttemptStarted(updateEvent.getAttemptId(),
+ updateEvent.getTimestamp());
+ } else {
+ vertex.speculator.notifyAttemptStatusUpdate(updateEvent.getAttemptId(),
+ updateEvent.getTaskAttemptState(), updateEvent.getTimestamp());
+ }
+ }
+ }
+ }
private static class TaskCompletedTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java
new file mode 100644
index 0000000..7e6f1c2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/DataStatistics.java
@@ -0,0 +1,86 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class DataStatistics {
+ private int count = 0;
+ private double sum = 0;
+ private double sumSquares = 0;
+
+ public DataStatistics() {
+ }
+
+ public DataStatistics(double initNum) {
+ this.count = 1;
+ this.sum = initNum;
+ this.sumSquares = initNum * initNum;
+ }
+
+ public synchronized void add(double newNum) {
+ this.count++;
+ this.sum += newNum;
+ this.sumSquares += newNum * newNum;
+ }
+
+ @VisibleForTesting
+ synchronized void updateStatistics(double old, double update) {
+ this.sum += update - old;
+ this.sumSquares += (update * update) - (old * old);
+ }
+
+ public synchronized double mean() {
+ // when no data then mean estimate should be large
+ //return count == 0 ? 0.0 : sum/count;
+ return count == 0 ? Long.MAX_VALUE : sum/count;
+ }
+
+ public synchronized double var() {
+ // E(X^2) - E(X)^2
+ if (count <= 1) {
+ return 0.0;
+ }
+ double mean = mean();
+ return Math.max((sumSquares/count) - mean * mean, 0.0d);
+ }
+
+ public synchronized double std() {
+ return Math.sqrt(this.var());
+ }
+
+ public synchronized double outlier(float sigma) {
+ if (count != 0.0) {
+ return mean() + std() * sigma;
+ }
+
+ // when no data available then outlier estimate should be large
+ //return 0.0;
+ return Long.MAX_VALUE;
+ }
+
+ public synchronized double count() {
+ return count;
+ }
+
+ public String toString() {
+ return "DataStatistics: count is " + count + ", sum is " + sum +
+ ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
new file mode 100644
index 0000000..8f76e05
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacySpeculator.java
@@ -0,0 +1,396 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Maintains runtime estimation statistics. Makes periodic updates
+ * estimates based on progress and decides on when to trigger a
+ * speculative attempt. Speculation attempts are triggered when the
+ * estimated runtime is more than a threshold beyond the mean runtime
+ * and the original task still has enough estimated runtime left that
+ * the speculative version is expected to finish sooner than that. If
+ * the original is close to completion then we dont start a speculation
+ * because it may be likely a wasted attempt. There is a delay between
+ * successive speculations.
+ */
+public class LegacySpeculator {
+
+ private static final long ON_SCHEDULE = Long.MIN_VALUE;
+ private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
+ private static final long TOO_NEW = Long.MIN_VALUE + 2;
+ private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
+ private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
+ private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
+
+ private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
+ private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
+
+ private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
+ private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
+ private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
+
+ private static final Log LOG = LogFactory.getLog(LegacySpeculator.class);
+
+ private final ConcurrentMap<TezTaskID, Boolean> runningTasks
+ = new ConcurrentHashMap<TezTaskID, Boolean>();
+
+ // Used to track any TaskAttempts that aren't heart-beating for a while, so
+ // that we can aggressively speculate instead of waiting for task-timeout.
+ private final ConcurrentMap<TezTaskAttemptID, TaskAttemptHistoryStatistics>
+ runningTaskAttemptStatistics = new ConcurrentHashMap<TezTaskAttemptID,
+ TaskAttemptHistoryStatistics>();
+ // Regular heartbeat from tasks is every 3 secs. So if we don't get a
+ // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change
+ // in progress.
+ private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
+
+
+ private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
+
+ private Vertex vertex;
+ private TaskRuntimeEstimator estimator;
+
+ private final Clock clock;
+ private long nextSpeculateTime = Long.MIN_VALUE;
+
+ public LegacySpeculator(Configuration conf, AppContext context, Vertex vertex) {
+ this(conf, context.getClock(), vertex);
+ }
+
+ public LegacySpeculator(Configuration conf, Clock clock, Vertex vertex) {
+ this(conf, getEstimator(conf, vertex), clock, vertex);
+ }
+
+ static private TaskRuntimeEstimator getEstimator
+ (Configuration conf, Vertex vertex) {
+ TaskRuntimeEstimator estimator = new LegacyTaskRuntimeEstimator();
+ estimator.contextualize(conf, vertex);
+
+ return estimator;
+ }
+
+ // This constructor is designed to be called by other constructors.
+ // However, it's public because we do use it in the test cases.
+ // Normally we figure out our own estimator.
+ public LegacySpeculator
+ (Configuration conf, TaskRuntimeEstimator estimator, Clock clock, Vertex vertex) {
+ this.vertex = vertex;
+ this.estimator = estimator;
+ this.clock = clock;
+ }
+
+/* ************************************************************* */
+
+ void maybeSpeculate() {
+ long now = clock.getTime();
+
+ if (now < nextSpeculateTime) {
+ return;
+ }
+
+ int speculations = maybeScheduleASpeculation();
+ long mininumRecomp
+ = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
+ : SOONEST_RETRY_AFTER_NO_SPECULATE;
+
+ long wait = Math.max(mininumRecomp,
+ clock.getTime() - now);
+ nextSpeculateTime = now + wait;
+
+ if (speculations > 0) {
+ LOG.info("We launched " + speculations
+ + " speculations. Waiting " + wait + " milliseconds.");
+ }
+ }
+
+/* ************************************************************* */
+
+ public void notifyAttemptStarted(TezTaskAttemptID taId, long timestamp) {
+ estimator.enrollAttempt(taId, timestamp);
+ }
+
+ public void notifyAttemptStatusUpdate(TezTaskAttemptID taId, TaskAttemptState reportedState,
+ long timestamp) {
+ statusUpdate(taId, reportedState, timestamp);
+ maybeSpeculate();
+ }
+
+ /**
+ * Absorbs one TaskAttemptStatus
+ *
+ * @param reportedStatus the status report that we got from a task attempt
+ * that we want to fold into the speculation data for this job
+ * @param timestamp the time this status corresponds to. This matters
+ * because statuses contain progress.
+ */
+ private void statusUpdate(TezTaskAttemptID attemptID, TaskAttemptState reportedState, long timestamp) {
+
+ TezTaskID taskID = attemptID.getTaskID();
+ Task task = vertex.getTask(taskID);
+
+ Preconditions.checkState(task != null, "Null task for attempt: " + attemptID);
+
+ estimator.updateAttempt(attemptID, reportedState, timestamp);
+
+ //if (stateString.equals(TaskAttemptState.RUNNING.name())) {
+ if (reportedState == TaskAttemptState.RUNNING) {
+ runningTasks.putIfAbsent(taskID, Boolean.TRUE);
+ } else {
+ runningTasks.remove(taskID, Boolean.TRUE);
+ //if (!stateString.equals(TaskAttemptState.STARTING.name())) {
+ if (reportedState == TaskAttemptState.STARTING) {
+ runningTaskAttemptStatistics.remove(attemptID);
+ }
+ }
+ }
+
+/* ************************************************************* */
+
+// This is the code section that runs periodically and adds speculations for
+// those jobs that need them.
+
+
+ // This can return a few magic values for tasks that shouldn't speculate:
+ // returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
+ // considering speculating this task
+ // returns ALREADY_SPECULATING if that is true. This has priority.
+ // returns TOO_NEW if our companion task hasn't gotten any information
+ // returns PROGRESS_IS_GOOD if the task is sailing through
+ // returns NOT_RUNNING if the task is not running
+ //
+ // All of these values are negative. Any value that should be allowed to
+ // speculate is 0 or positive.
+ private long speculationValue(Task task, long now) {
+ Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+ TezTaskID taskID = task.getTaskId();
+ long acceptableRuntime = Long.MIN_VALUE;
+ long result = Long.MIN_VALUE;
+
+ // short circuit completed tasks. no need to spend time on them
+ if (task.getState() == TaskState.SUCCEEDED) {
+ return NOT_RUNNING;
+ }
+
+ if (!mayHaveSpeculated.contains(taskID)) {
+ acceptableRuntime = estimator.thresholdRuntime(taskID);
+ if (acceptableRuntime == Long.MAX_VALUE) {
+ return ON_SCHEDULE;
+ }
+ }
+
+ TezTaskAttemptID runningTaskAttemptID = null;
+
+ int numberRunningAttempts = 0;
+
+ for (TaskAttempt taskAttempt : attempts.values()) {
+ if (taskAttempt.getState() == TaskAttemptState.RUNNING
+ || taskAttempt.getState() == TaskAttemptState.STARTING) {
+ if (++numberRunningAttempts > 1) {
+ return ALREADY_SPECULATING;
+ }
+ runningTaskAttemptID = taskAttempt.getID();
+
+ long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
+
+ long taskAttemptStartTime
+ = estimator.attemptEnrolledTime(runningTaskAttemptID);
+ if (taskAttemptStartTime > now) {
+ // This background process ran before we could process the task
+ // attempt status change that chronicles the attempt start
+ return TOO_NEW;
+ }
+
+ long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
+
+ long estimatedReplacementEndTime
+ = now + estimator.newAttemptEstimatedRuntime();
+
+ float progress = taskAttempt.getProgress();
+ TaskAttemptHistoryStatistics data =
+ runningTaskAttemptStatistics.get(runningTaskAttemptID);
+ if (data == null) {
+ runningTaskAttemptStatistics.put(runningTaskAttemptID,
+ new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
+ } else {
+ if (estimatedRunTime == data.getEstimatedRunTime()
+ && progress == data.getProgress()) {
+ // Previous stats are same as same stats
+ if (data.notHeartbeatedInAWhile(now)) {
+ // Stats have stagnated for a while, simulate heart-beat.
+ // Now simulate the heart-beat
+ statusUpdate(taskAttempt.getID(), taskAttempt.getState(), clock.getTime());
+ }
+ } else {
+ // Stats have changed - update our data structure
+ data.setEstimatedRunTime(estimatedRunTime);
+ data.setProgress(progress);
+ data.resetHeartBeatTime(now);
+ }
+ }
+
+ if (estimatedEndTime < now) {
+ return PROGRESS_IS_GOOD;
+ }
+
+ if (estimatedReplacementEndTime >= estimatedEndTime) {
+ return TOO_LATE_TO_SPECULATE;
+ }
+
+ result = estimatedEndTime - estimatedReplacementEndTime;
+ }
+ }
+
+ // If we are here, there's at most one task attempt.
+ if (numberRunningAttempts == 0) {
+ return NOT_RUNNING;
+ }
+
+
+
+ if (acceptableRuntime == Long.MIN_VALUE) {
+ acceptableRuntime = estimator.thresholdRuntime(taskID);
+ if (acceptableRuntime == Long.MAX_VALUE) {
+ return ON_SCHEDULE;
+ }
+ }
+
+ return result;
+ }
+
+ //Add attempt to a given Task.
+ protected void addSpeculativeAttempt(TezTaskID taskID) {
+ LOG.info
+ ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
+ vertex.scheduleSpeculativeTask(taskID);
+ mayHaveSpeculated.add(taskID);
+ }
+
+ private int maybeScheduleASpeculation() {
+ int successes = 0;
+
+ long now = clock.getTime();
+
+ int numberSpeculationsAlready = 0;
+ int numberRunningTasks = 0;
+
+ Map<TezTaskID, Task> tasks = vertex.getTasks();
+
+ int numberAllowedSpeculativeTasks
+ = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
+ PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
+
+ TezTaskID bestTaskID = null;
+ long bestSpeculationValue = -1L;
+
+ // this loop is potentially pricey.
+ // TODO track the tasks that are potentially worth looking at
+ for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
+ long mySpeculationValue = speculationValue(taskEntry.getValue(), now);
+
+ if (mySpeculationValue == ALREADY_SPECULATING) {
+ ++numberSpeculationsAlready;
+ }
+
+ if (mySpeculationValue != NOT_RUNNING) {
+ ++numberRunningTasks;
+ }
+
+ if (mySpeculationValue > bestSpeculationValue) {
+ bestTaskID = taskEntry.getKey();
+ bestSpeculationValue = mySpeculationValue;
+ }
+ }
+ numberAllowedSpeculativeTasks
+ = (int) Math.max(numberAllowedSpeculativeTasks,
+ PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
+
+ // If we found a speculation target, fire it off
+ if (bestTaskID != null
+ && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
+ addSpeculativeAttempt(bestTaskID);
+ ++successes;
+ }
+
+ return successes;
+ }
+
+ static class TaskAttemptHistoryStatistics {
+
+ private long estimatedRunTime;
+ private float progress;
+ private long lastHeartBeatTime;
+
+ public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress,
+ long nonProgressStartTime) {
+ this.estimatedRunTime = estimatedRunTime;
+ this.progress = progress;
+ resetHeartBeatTime(nonProgressStartTime);
+ }
+
+ public long getEstimatedRunTime() {
+ return this.estimatedRunTime;
+ }
+
+ public float getProgress() {
+ return this.progress;
+ }
+
+ public void setEstimatedRunTime(long estimatedRunTime) {
+ this.estimatedRunTime = estimatedRunTime;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public boolean notHeartbeatedInAWhile(long now) {
+ if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) {
+ return false;
+ } else {
+ resetHeartBeatTime(now);
+ return true;
+ }
+ }
+
+ public void resetHeartBeatTime(long lastHeartBeatTime) {
+ this.lastHeartBeatTime = lastHeartBeatTime;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java
new file mode 100644
index 0000000..14d269c
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/LegacyTaskRuntimeEstimator.java
@@ -0,0 +1,136 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Runtime estimator that uses a simple scheme of estimating task attempt
+ * runtime based on current elapsed runtime and reported progress.
+ */
+public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
+
+ private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
+ = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+ private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
+ = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
+
+ @Override
+ public void updateAttempt(TezTaskAttemptID attemptID, TaskAttemptState state, long timestamp) {
+ super.updateAttempt(attemptID, state, timestamp);
+
+
+ Task task = vertex.getTask(attemptID.getTaskID());
+
+ if (task == null) {
+ return;
+ }
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt == null) {
+ return;
+ }
+
+ float progress = taskAttempt.getProgress();
+
+ Long boxedStart = startTimes.get(attemptID);
+ long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+ // We need to do two things.
+ // 1: If this is a completion, we accumulate statistics in the superclass
+ // 2: If this is not a completion, we learn more about it.
+
+ // This is not a completion, but we're cooking.
+ //
+ if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
+ // See if this task is already in the registry
+ AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+ AtomicLong estimateVarianceContainer
+ = attemptRuntimeEstimateVariances.get(taskAttempt);
+
+ if (estimateContainer == null) {
+ if (attemptRuntimeEstimates.get(taskAttempt) == null) {
+ attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
+
+ estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
+ }
+ }
+
+ if (estimateVarianceContainer == null) {
+ attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
+ estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
+ }
+
+
+ long estimate = -1;
+ long varianceEstimate = -1;
+
+ // This code assumes that we'll never consider starting a third
+ // speculative task attempt if two are already running for this task
+ if (start > 0 && timestamp > start) {
+ estimate = (long) ((timestamp - start) / Math.max(0.0001, progress));
+ varianceEstimate = (long) (estimate * progress / 10);
+ }
+ if (estimateContainer != null) {
+ estimateContainer.set(estimate);
+ }
+ if (estimateVarianceContainer != null) {
+ estimateVarianceContainer.set(varianceEstimate);
+ }
+ }
+ }
+
+ private long storedPerAttemptValue
+ (Map<TaskAttempt, AtomicLong> data, TezTaskAttemptID attemptID) {
+ Task task = vertex.getTask(attemptID.getTaskID());
+
+ if (task == null) {
+ return -1L;
+ }
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt == null) {
+ return -1L;
+ }
+
+ AtomicLong estimate = data.get(taskAttempt);
+
+ return estimate == null ? -1L : estimate.get();
+
+ }
+
+ @Override
+ public long estimatedRuntime(TezTaskAttemptID attemptID) {
+ return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
+ }
+
+ @Override
+ public long runtimeEstimateVariance(TezTaskAttemptID attemptID) {
+ return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java
new file mode 100644
index 0000000..d4d1a7f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/StartEndTimesBase.java
@@ -0,0 +1,138 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+/**
+ * Base class that uses the attempt runtime estimations from a derived class
+ * and uses it to determine outliers based on deviating beyond the mean
+ * estimated runtime by some threshold
+ */
+abstract class StartEndTimesBase implements TaskRuntimeEstimator {
+ static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE
+ = 0.05F;
+ static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+ = 1;
+
+ protected Vertex vertex;
+
+ protected final Map<TezTaskAttemptID, Long> startTimes
+ = new ConcurrentHashMap<TezTaskAttemptID, Long>();
+
+ protected final DataStatistics taskStatistics = new DataStatistics();
+
+ private float slowTaskRelativeTresholds;
+
+ protected final Set<Task> doneTasks = new HashSet<Task>();
+
+ @Override
+ public void enrollAttempt(TezTaskAttemptID id, long timestamp) {
+ startTimes.put(id, timestamp);
+ }
+
+ @Override
+ public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
+ Long result = startTimes.get(attemptID);
+
+ return result == null ? Long.MAX_VALUE : result;
+ }
+
+ @Override
+ public void contextualize(Configuration conf, Vertex vertex) {
+ slowTaskRelativeTresholds = conf.getFloat(
+ TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f);
+ this.vertex = vertex;
+ }
+
+ protected DataStatistics dataStatisticsForTask(TezTaskID taskID) {
+ return taskStatistics;
+ }
+
+ @Override
+ public long thresholdRuntime(TezTaskID taskID) {
+ int completedTasks = vertex.getCompletedTasks();
+
+ int totalTasks = vertex.getTotalTasks();
+
+ if (completedTasks < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+ || (((float)completedTasks) / totalTasks)
+ < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
+ return Long.MAX_VALUE;
+ }
+
+ long result = (long)taskStatistics.outlier(slowTaskRelativeTresholds);
+ return result;
+ }
+
+ @Override
+ public long newAttemptEstimatedRuntime() {
+ return (long)taskStatistics.mean();
+ }
+
+ @Override
+ public void updateAttempt(TezTaskAttemptID attemptID, TaskAttemptState state, long timestamp) {
+
+ Task task = vertex.getTask(attemptID.getTaskID());
+
+ if (task == null) {
+ return;
+ }
+
+ Long boxedStart = startTimes.get(attemptID);
+ long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
+ boolean isNew = false;
+ // is this a new success?
+ synchronized (doneTasks) {
+ if (!doneTasks.contains(task)) {
+ doneTasks.add(task);
+ isNew = true;
+ }
+ }
+
+ // It's a new completion
+ // Note that if a task completes twice [because of a previous speculation
+ // and a race, or a success followed by loss of the machine with the
+ // local data] we only count the first one.
+ if (isNew) {
+ long finish = timestamp;
+ if (start > 1L && finish > 1L && start <= finish) {
+ long duration = finish - start;
+ taskStatistics.add(duration);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java
new file mode 100644
index 0000000..c8edd1e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/speculation/legacy/TaskRuntimeEstimator.java
@@ -0,0 +1,91 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.speculation.legacy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+/**
+ * Estimate the runtime for tasks of a given vertex.
+ *
+ */
+public interface TaskRuntimeEstimator {
+ public void enrollAttempt(TezTaskAttemptID id, long timestamp);
+
+ public long attemptEnrolledTime(TezTaskAttemptID attemptID);
+
+ public void updateAttempt(TezTaskAttemptID taId, TaskAttemptState reportedState, long timestamp);
+
+ public void contextualize(Configuration conf, Vertex vertex);
+
+ /**
+ *
+ * Find a maximum reasonable execution wallclock time. Includes the time
+ * already elapsed.
+ *
+ * Find a maximum reasonable execution time. Includes the time
+ * already elapsed. If the projected total execution time for this task
+ * ever exceeds its reasonable execution time, we may speculate it.
+ *
+ * @param id the {@link TezTaskID} of the task we are asking about
+ * @return the task's maximum reasonable runtime, or MAX_VALUE if
+ * we don't have enough information to rule out any runtime,
+ * however long.
+ *
+ */
+ public long thresholdRuntime(TezTaskID id);
+
+ /**
+ *
+ * Estimate a task attempt's total runtime. Includes the time already
+ * elapsed.
+ *
+ * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
+ * @return our best estimate of the attempt's runtime, or {@code -1} if
+ * we don't have enough information yet to produce an estimate.
+ *
+ */
+ public long estimatedRuntime(TezTaskAttemptID id);
+
+ /**
+ *
+ * Estimates how long a new attempt on this task will take if we start
+ * one now
+ *
+ * @return our best estimate of a new attempt's runtime, or {@code -1} if
+ * we don't have enough information yet to produce an estimate.
+ *
+ */
+ public long newAttemptEstimatedRuntime();
+
+ /**
+ *
+ * Computes the width of the error band of our estimate of the task
+ * runtime as returned by {@link #estimatedRuntime(TezTaskAttemptID)}
+ *
+ * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
+ * @return our best estimate of the attempt's runtime, or {@code -1} if
+ * we don't have enough information yet to produce an estimate.
+ *
+ */
+ public long runtimeEstimateVariance(TezTaskAttemptID id);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
new file mode 100644
index 0000000..d015714
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockClock.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.yarn.util.Clock;
+
+public class MockClock implements Clock {
+
+ long time = 1000;
+
+ @Override
+ public long getTime() {
+ return time;
+ }
+
+ public void incrementTime(long inc) {
+ time += inc;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b4109e7..a548e3c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -50,6 +50,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
@@ -75,9 +76,12 @@ public class MockDAGAppMaster extends DAGAppMaster {
AtomicBoolean startScheduling = new AtomicBoolean(true);
AtomicBoolean goFlag;
+ boolean updateProgress = true;
Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap();
+ Map<TezTaskAttemptID, Integer> tasksWithStatusUpdates = Maps.newConcurrentMap();
+
public MockContainerLauncher(AtomicBoolean goFlag) {
super("MockContainerLauncher");
this.goFlag = goFlag;
@@ -88,6 +92,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
TezTaskAttemptID taId;
String vName;
ContainerLaunchContext launchContext;
+ int numUpdates = 0;
boolean completed;
public ContainerData(ContainerId cId, ContainerLaunchContext context) {
@@ -149,6 +154,10 @@ public class MockDAGAppMaster extends DAGAppMaster {
public void startScheduling(boolean value) {
startScheduling.set(value);
}
+
+ public void updateProgress(boolean value) {
+ this.updateProgress = value;
+ }
public Map<ContainerId, ContainerData> getContainers() {
return containers;
@@ -164,6 +173,10 @@ public class MockDAGAppMaster extends DAGAppMaster {
cData.clear();
}
+ public void setStatusUpdatesForTask(TezTaskAttemptID tId, int numUpdates) {
+ tasksWithStatusUpdates.put(tId, numUpdates);
+ }
+
void stop(NMCommunicatorStopRequestEvent event) {
// remove from simulated container list
containers.remove(event.getContainerId());
@@ -183,6 +196,13 @@ public class MockDAGAppMaster extends DAGAppMaster {
Thread.sleep(50);
}
}
+
+ void incrementTime(long inc) {
+ Clock clock = getContext().getClock();
+ if (clock instanceof MockClock) {
+ ((MockClock) clock).incrementTime(inc);
+ }
+ }
@Override
public void run() {
@@ -192,6 +212,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
if (!startScheduling.get()) { // schedule when asked to do so by the test code
continue;
}
+ incrementTime(1000);
for (Map.Entry<ContainerId, ContainerData> entry : containers.entrySet()) {
ContainerData cData = entry.getValue();
ContainerId cId = entry.getKey();
@@ -214,8 +235,19 @@ public class MockDAGAppMaster extends DAGAppMaster {
} else if (!cData.completed) {
// container is assigned a task and task is not completed
// complete the task or preempt the task
- Integer version = preemptedTasks.get(cData.taId.getTaskID());
- if (version != null && cData.taId.getId() <= version.intValue()) {
+ Integer version = preemptedTasks.get(cData.taId.getTaskID());
+ Integer updatesToMake = tasksWithStatusUpdates.get(cData.taId);
+ if (cData.numUpdates == 0 || // do at least one update
+ updatesToMake != null && cData.numUpdates < updatesToMake) {
+ cData.numUpdates++;
+ float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1;
+ float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f;
+ TezVertexID vertexId = cData.taId.getTaskID().getVertexID();
+ getContext().getEventHandler().handle(
+ new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent(
+ new TaskStatusUpdateEvent(null, progress), new EventMetaData(
+ EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)))));
+ } else if (version != null && cData.taId.getId() <= version.intValue()) {
preemptContainer(cData);
} else {
// send a done notification
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 7e408e1..2631e3c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -29,16 +29,18 @@ import org.apache.tez.client.LocalClient;
public class MockLocalClient extends LocalClient {
MockDAGAppMaster mockApp;
AtomicBoolean mockAppLauncherGoFlag;
+ Clock mockClock;
- public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag) {
+ public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) {
this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
+ this.mockClock = clock;
}
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String userDir) {
mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
- new SystemClock(), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag);
+ (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag);
return mockApp;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
index 617415e..0ff3340 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.TezConfiguration;
@@ -32,9 +33,9 @@ public class MockTezClient extends TezClient {
MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
Map<String, LocalResource> localResources, Credentials credentials,
- AtomicBoolean mockAppLauncherGoFlag) {
+ Clock clock, AtomicBoolean mockAppLauncherGoFlag) {
super(name, tezConf, isSession, localResources, credentials);
- this.client = new MockLocalClient(mockAppLauncherGoFlag);
+ this.client = new MockLocalClient(mockAppLauncherGoFlag, clock);
}
protected FrameworkClient createFrameworkClient() {
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 8650aea..682e6ed 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -67,7 +67,7 @@ public class TestMockDAGAppMaster {
public void testLocalResourceSetup() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
- MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
@@ -119,7 +119,7 @@ public class TestMockDAGAppMaster {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
- MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
DAGClient dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
@@ -127,7 +127,7 @@ public class TestMockDAGAppMaster {
tezClient.stop();
// submit the same DAG again to verify it can be done.
- tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
@@ -139,7 +139,7 @@ public class TestMockDAGAppMaster {
public void testSchedulerErrorHandling() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
- MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index 0958c48..bc15954 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -92,7 +92,7 @@ public class TestPreemption {
tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null,
- mockAppLauncherGoFlag);
+ null, mockAppLauncherGoFlag);
tezClient.start();
DAGClient dagClient = tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER));
@@ -148,7 +148,7 @@ public class TestPreemption {
tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, true, null, null,
- mockAppLauncherGoFlag);
+ null, mockAppLauncherGoFlag);
tezClient.start();
syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
return tezClient;
http://git-wip-us.apache.org/repos/asf/tez/blob/6be75661/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
new file mode 100644
index 0000000..114c44b
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -0,0 +1,161 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
+
+@SuppressWarnings("deprecation")
+public class TestSpeculation {
+ static Configuration defaultConf;
+ static FileSystem localFs;
+ static Path workDir;
+
+ MockDAGAppMaster mockApp;
+ MockContainerLauncher mockLauncher;
+
+ static {
+ try {
+ defaultConf = new Configuration(false);
+ defaultConf.set("fs.defaultFS", "file:///");
+ defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
+ defaultConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, 2);
+ localFs = FileSystem.getLocal(defaultConf);
+ workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestSpeculation").makeQualified(localFs);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ MockTezClient createTezSession() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+ AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
+ MockTezClient tezClient = new MockTezClient("testspeculation", tezconf, true, null, null,
+ new MockClock(), mockAppLauncherGoFlag);
+ tezClient.start();
+ syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
+ return tezClient;
+ }
+
+ void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag,
+ MockTezClient tezClient) throws Exception {
+ synchronized (mockAppLauncherGoFlag) {
+ while (!mockAppLauncherGoFlag.get()) {
+ mockAppLauncherGoFlag.wait();
+ }
+ mockApp = tezClient.getLocalClient().getMockApp();
+ mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(allowScheduling);
+ mockAppLauncherGoFlag.notify();
+ }
+ }
+
+ public void testBasicSpeculation(boolean withProgress) throws Exception {
+ DAG dag = DAG.create("test");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
+ dag.addVertex(vA);
+
+ MockTezClient tezClient = createTezSession();
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+ // original attempt is killed and speculative one is successful
+ TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+ TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
+
+ mockLauncher.updateProgress(withProgress);
+ mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ Task task = dagImpl.getTask(killedTaId.getTaskID());
+ Assert.assertEquals(2, task.getAttempts().size());
+ Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
+ TaskAttempt killedAttempt = task.getAttempt(killedTaId);
+ Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
+ tezClient.stop();
+ }
+
+ @Test (timeout=10000)
+ public void testBasicSpeculationWithProgress() throws Exception {
+ testBasicSpeculation(true);
+ }
+
+ @Test (timeout=10000)
+ public void testBasicSpeculationWithoutProgress() throws Exception {
+ testBasicSpeculation(false);
+ }
+
+ @Test (timeout=10000)
+ public void testBasicSpeculationNotUseful() throws Exception {
+ DAG dag = DAG.create("test");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
+ dag.addVertex(vA);
+
+ MockTezClient tezClient = createTezSession();
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+ // original attempt is successful and speculative one is killed
+ TezTaskAttemptID successTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+ TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 1);
+
+ mockLauncher.setStatusUpdatesForTask(successTaId, 100);
+ mockLauncher.setStatusUpdatesForTask(killedTaId, 100);
+
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ Task task = dagImpl.getTask(killedTaId.getTaskID());
+ Assert.assertEquals(2, task.getAttempts().size());
+ Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
+ TaskAttempt killedAttempt = task.getAttempt(killedTaId);
+ Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
+ tezClient.stop();
+ }
+
+
+}