You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/09/17 01:25:22 UTC
git commit: TEZ-460. Fix TaskAttempt status update via heartbeat
events (bikas)
Updated Branches:
refs/heads/TEZ-398 e7b591d79 -> f9713195f
TEZ-460. Fix TaskAttempt status update via heartbeat events (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f9713195
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f9713195
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f9713195
Branch: refs/heads/TEZ-398
Commit: f9713195f30043b04cb422214e1e6e43d7ac7a5a
Parents: e7b591d
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Sep 16 16:23:12 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Sep 16 16:23:12 2013 -0700
----------------------------------------------------------------------
.../dag/app/TaskAttemptListenerImpTezDag.java | 4 +-
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 38 ++++++++----
.../dag/event/TaskAttemptEventStatusUpdate.java | 20 ++++--
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 65 +++++++-------------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 26 +++++---
.../dag/app/speculate/DefaultSpeculator.java | 6 +-
...ponentiallySmoothedTaskRuntimeEstimator.java | 4 +-
.../speculate/LegacyTaskRuntimeEstimator.java | 4 +-
.../app/speculate/NullTaskRuntimesEngine.java | 6 +-
.../tez/dag/app/speculate/Speculator.java | 4 +-
.../tez/dag/app/speculate/SpeculatorEvent.java | 10 +--
.../dag/app/speculate/StartEndTimesBase.java | 6 +-
.../dag/app/speculate/TaskRuntimeEstimator.java | 6 +-
13 files changed, 109 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 3c32d36..655119f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -54,7 +54,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerImpl;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
@@ -269,7 +269,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
taskHeartbeatHandler.progressing(taskAttemptId);
pingContainerHeartbeatHandler(taskAttemptId);
- TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+ TaskAttemptStatusOld taskAttemptStatus = new TaskAttemptStatusOld();
taskAttemptStatus.id = taskAttemptId;
// Task sends the updated progress to the TT.
taskAttemptStatus.progress = taskStatus.getProgress();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index a27c45b..af6c0f5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -19,9 +19,11 @@
package org.apache.tez.dag.app.dag;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -34,6 +36,30 @@ import org.apache.tez.dag.records.TezVertexID;
* Read only view of TaskAttempt.
*/
public interface TaskAttempt {
+
+ public static class TaskAttemptStatus {
+ public TaskAttemptState state;
+ public DAGCounter localityCounter;
+ public float progress;
+ public TezCounters counters;
+
+ // insert these counters till they come natively from the task itself.
+ // HDFS-5098
+ private AtomicBoolean localitySet = new AtomicBoolean(false);
+ public void setLocalityCounter(DAGCounter localityCounter) {
+ if (!localitySet.get()) {
+ localitySet.set(true);
+ if (counters == null) {
+ counters = new TezCounters();
+ }
+ if (localityCounter != null) {
+ counters.findCounter(localityCounter).increment(1);
+ // TODO Maybe validate that the correct value is being set.
+ }
+ }
+ }
+ }
+
TezTaskAttemptID getID();
TezTaskID getTaskID();
TezVertexID getVertexID();
@@ -88,18 +114,6 @@ public interface TaskAttempt {
*/
long getFinishTime();
- /**
- * @return The attempt's input ready time. If
- * attempt's input is not ready yet, returns 0.
- */
- long getInputReadyTime();
-
- /**
- * @return The attempt's output ready time. If attempt's output is not
- * ready yet, returns 0.
- */
- long getOutputReadyTime();
-
// TODO TEZDAG - remove all references to ShufflePort
/**
* @return the port shuffle is on.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index b968fc6..3a8c489 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -25,18 +25,30 @@ import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
+
+ private TaskStatusUpdateEvent taskAttemptStatus;
+
+ public TaskAttemptEventStatusUpdate(TezTaskAttemptID id, TaskStatusUpdateEvent statusEvent) {
+ super(id, TaskAttemptEventType.TA_STATUS_UPDATE);
+ this.taskAttemptStatus = statusEvent;
+ }
+
+ public TaskStatusUpdateEvent getStatusEvent() {
+ return this.taskAttemptStatus;
+ }
- private TaskAttemptStatus reportedTaskAttemptStatus;
+ private TaskAttemptStatusOld reportedTaskAttemptStatus;
public TaskAttemptEventStatusUpdate(TezTaskAttemptID id,
- TaskAttemptStatus taskAttemptStatus) {
+ TaskAttemptStatusOld taskAttemptStatus) {
super(id, TaskAttemptEventType.TA_STATUS_UPDATE);
this.reportedTaskAttemptStatus = taskAttemptStatus;
}
- public TaskAttemptStatus getReportedTaskAttemptStatus() {
+ public TaskAttemptStatusOld getReportedTaskAttemptStatus() {
return reportedTaskAttemptStatus;
}
@@ -44,7 +56,7 @@ public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
* The internal TaskAttemptStatus object corresponding to remote Task status.
*
*/
- public static class TaskAttemptStatus {
+ public static class TaskAttemptStatusOld {
private AtomicBoolean localitySet = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 30bb1eb..ef51651 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -77,11 +77,9 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptFetchFailure;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -92,6 +90,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
import org.apache.tez.engine.newapi.impl.TaskSpec;
import com.google.common.annotations.VisibleForTesting;
@@ -132,7 +131,6 @@ public class TaskAttemptImpl implements TaskAttempt,
private String nodeRackName;
private TaskAttemptStatus reportedStatus;
- private DAGCounter localityCounter;
// Used to store locality information when
Set<String> taskHosts = new HashSet<String>();
@@ -316,10 +314,10 @@ public class TaskAttemptImpl implements TaskAttempt,
result.setProgress(reportedStatus.progress);
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
- result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
+ //result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
//result.setPhase(reportedStatus.phase);
- result.setStateString(reportedStatus.stateString);
+ //result.setStateString(reportedStatus.stateString);
result.setCounters(getCounters());
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
@@ -349,7 +347,7 @@ public class TaskAttemptImpl implements TaskAttempt,
public TezCounters getCounters() {
readLock.lock();
try {
- reportedStatus.setLocalityCounter(localityCounter);
+ reportedStatus.setLocalityCounter(reportedStatus.localityCounter);
TezCounters counters = reportedStatus.counters;
if (counters == null) {
counters = EMPTY_COUNTERS;
@@ -471,26 +469,6 @@ public class TaskAttemptImpl implements TaskAttempt,
}
@Override
- public long getInputReadyTime() {
- readLock.lock();
- try {
- return this.reportedStatus.shuffleFinishTime;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- public long getOutputReadyTime() {
- readLock.lock();
- try {
- return this.reportedStatus.sortFinishTime;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
public int getShufflePort() {
readLock.lock();
try {
@@ -980,15 +958,16 @@ public class TaskAttemptImpl implements TaskAttempt,
// JobHistoryEvent
ta.logJobHistoryAttemptStarted();
- // Compute LOLCAITY counter for this task.
+ // TODO Remove after HDFS-5098
+ // Compute LOCALITY counter for this task.
if (ta.taskHosts.contains(ta.containerNodeId.getHost())) {
- ta.localityCounter = DAGCounter.DATA_LOCAL_TASKS;
+ ta.reportedStatus.localityCounter = DAGCounter.DATA_LOCAL_TASKS;
} else if (ta.taskRacks.contains(ta.nodeRackName)) {
- ta.localityCounter = DAGCounter.RACK_LOCAL_TASKS;
+ ta.reportedStatus.localityCounter = DAGCounter.RACK_LOCAL_TASKS;
} else {
// Not computing this if the task does not have locality information.
if (ta.locationHint != null) {
- ta.localityCounter = DAGCounter.OTHER_LOCAL_TASKS;
+ ta.reportedStatus.localityCounter = DAGCounter.OTHER_LOCAL_TASKS;
}
}
@@ -1076,22 +1055,24 @@ public class TaskAttemptImpl implements TaskAttempt,
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
- TaskAttemptStatus newReportedStatus = ((TaskAttemptEventStatusUpdate) event)
- .getReportedTaskAttemptStatus();
- ta.reportedStatus = newReportedStatus;
- ta.reportedStatus.taskState = ta.getState();
+ TaskStatusUpdateEvent statusEvent = ((TaskAttemptEventStatusUpdate) event)
+ .getStatusEvent();
+ ta.reportedStatus.state = ta.getState();
+ ta.reportedStatus.progress = statusEvent.getProgress();
+ ta.reportedStatus.counters = statusEvent.getCounters();
// Inform speculator of status.
//ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
ta.updateProgressSplits();
- // Inform the job about fetch failures if they exist.
- if (ta.reportedStatus.fetchFailedMaps != null
- && ta.reportedStatus.fetchFailedMaps.size() > 0) {
- ta.sendEvent(new VertexEventTaskAttemptFetchFailure(ta.attemptId,
- ta.reportedStatus.fetchFailedMaps));
- }
+ // TODO TEZ-431
+// // Inform the job about fetch failures if they exist.
+// if (ta.reportedStatus.fetchFailedMaps != null
+// && ta.reportedStatus.fetchFailedMaps.size() > 0) {
+// ta.sendEvent(new VertexEventTaskAttemptFetchFailure(ta.attemptId,
+// ta.reportedStatus.fetchFailedMaps));
+// }
// TODO at some point. Nodes may be interested in FetchFailure info.
// Can be used to blacklist nodes.
}
@@ -1217,8 +1198,8 @@ public class TaskAttemptImpl implements TaskAttempt,
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
// result.phase = Phase.STARTING;
- result.stateString = "NEW";
- result.taskState = TaskAttemptState.NEW;
+ //result.stateString = "NEW";
+ result.state = TaskAttemptState.NEW;
//TezCounters counters = EMPTY_COUNTERS;
//result.counters = counters;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6e6e109..845201c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -84,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -106,6 +107,7 @@ import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.engine.newapi.events.DataMovementEvent;
import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
import org.apache.tez.engine.newapi.impl.EventMetaData;
import org.apache.tez.engine.newapi.impl.InputSpec;
import org.apache.tez.engine.newapi.impl.OutputSpec;
@@ -1418,6 +1420,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private void addDiagnostic(String diag) {
diagnostics.add(diag);
}
+
+ private static void checkEventSourceMetadata(Vertex vertex, EventMetaData sourceMeta) {
+ if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) {
+ throw new TezUncheckedException(
+ "Bad routing of event. Event-vertex: "
+ + sourceMeta.getTaskVertexName() + " Expected: "
+ + vertex.getName());
+ }
+ }
private static class RouteEventTransition implements
SingleArcTransition<VertexImpl, VertexEvent> {
@@ -1426,14 +1437,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
List<TezEvent> tezEvents = rEvent.getEvents();
for(TezEvent tezEvent : tezEvents) {
+ EventMetaData sourceMeta = tezEvent.getSourceInfo();
+ checkEventSourceMetadata(vertex, sourceMeta);
switch(tezEvent.getEventType()) {
case DATA_MOVEMENT_EVENT:
{
- EventMetaData sourceMeta = tezEvent.getSourceInfo();
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
dmEvent.setVersion(srcTaId.getId());
- assert sourceMeta.getTaskVertexName().equals(vertex.getName());
Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
sourceMeta.getEdgeVertexName()));
destEdge.sendTezEventToDestinationTasks(tezEvent);
@@ -1441,11 +1452,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
case INPUT_FAILED_EVENT:
{
- EventMetaData sourceMeta = tezEvent.getSourceInfo();
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
ifEvent.setVersion(srcTaId.getId());
- assert sourceMeta.getTaskVertexName().equals(vertex.getName());
Edge destEdge = vertex.targetVertices.get(vertex.getDAG().getVertex(
sourceMeta.getEdgeVertexName()));
destEdge.sendTezEventToDestinationTasks(tezEvent);
@@ -1453,15 +1462,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
case INPUT_READ_ERROR_EVENT:
{
- EventMetaData sourceMeta = tezEvent.getSourceInfo();
- assert sourceMeta.getTaskVertexName().equals(vertex.getName());
Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
sourceMeta.getEdgeVertexName()));
srcEdge.sendTezEventToSourceTasks(tezEvent);
}
break;
case TASK_STATUS_UPDATE_EVENT:
- // TODO NEWTEZ FIXME: Handle this event
+ {
+ TaskStatusUpdateEvent sEvent = (TaskStatusUpdateEvent) tezEvent.getEvent();
+ vertex.getEventHandler().handle(
+ new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(),
+ sEvent));
+ }
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
index 4dea478..24948c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
@@ -43,7 +43,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -209,7 +209,7 @@ public class DefaultSpeculator extends AbstractService implements
}
@Override
- public void handleAttempt(TaskAttemptStatus status) {
+ public void handleAttempt(TaskAttemptStatusOld status) {
long timestamp = clock.getTime();
statusUpdate(status, timestamp);
}
@@ -287,7 +287,7 @@ public class DefaultSpeculator extends AbstractService implements
* @param timestamp the time this status corresponds to. This matters
* because statuses contain progress.
*/
- protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
+ protected void statusUpdate(TaskAttemptStatusOld reportedStatus, long timestamp) {
String stateString = reportedStatus.taskState.toString();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
index 558f78c..10f217d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -181,7 +181,7 @@ public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase
}
@Override
- public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
super.updateAttempt(status, timestamp);
TezTaskAttemptID attemptID = status.id;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
index b7ebc68..ff7564c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
@@ -27,7 +27,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -39,7 +39,7 @@ public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
= new ConcurrentHashMap<TaskAttempt, AtomicLong>();
@Override
- public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
super.updateAttempt(status, timestamp);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
index 9907bc6..9fa3b4b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.app.speculate;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -31,7 +31,7 @@ import org.apache.tez.dag.records.TezTaskID;
*/
public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
@Override
- public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+ public void enrollAttempt(TaskAttemptStatusOld status, long timestamp) {
// no code
}
@@ -41,7 +41,7 @@ public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
}
@Override
- public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
// no code
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
index 61d9d48..d4d0b5a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
@@ -19,7 +19,7 @@
package org.apache.tez.dag.app.speculate;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
/**
* Speculator component. Task Attempts' status updates are sent to this
@@ -41,5 +41,5 @@ public interface Speculator
// This will be implemented if we go to a model where the events are
// processed within the TaskAttempts' state transitions' code.
- public void handleAttempt(TaskAttemptStatus status);
+ public void handleAttempt(TaskAttemptStatusOld status);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
index c8f400b..917abb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
@@ -19,7 +19,7 @@
package org.apache.tez.dag.app.speculate;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -27,7 +27,7 @@ import org.apache.tez.dag.records.TezTaskID;
public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
// valid for ATTEMPT_STATUS_UPDATE
- private TaskAttemptStatus reportedStatus;
+ private TaskAttemptStatusOld reportedStatus;
// valid for TASK_CONTAINER_NEED_UPDATE
private TezTaskID taskID;
@@ -41,14 +41,14 @@ public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
this.dagId = dagId;
}
- public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
+ public SpeculatorEvent(TaskAttemptStatusOld reportedStatus, long timestamp) {
super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
this.reportedStatus = reportedStatus;
}
public SpeculatorEvent(TezTaskAttemptID attemptID, boolean flag, long timestamp) {
super(Speculator.EventType.ATTEMPT_START, timestamp);
- this.reportedStatus = new TaskAttemptStatus();
+ this.reportedStatus = new TaskAttemptStatusOld();
this.reportedStatus.id = attemptID;
this.taskID = attemptID.getTaskID();
}
@@ -68,7 +68,7 @@ public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
this.containersNeededChange = containersNeededChange;
}
- public TaskAttemptStatus getReportedStatus() {
+ public TaskAttemptStatusOld getReportedStatus() {
return reportedStatus;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
index 8d36f28..68d1369 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
@@ -32,7 +32,7 @@ import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -66,7 +66,7 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator {
protected final Set<Task> doneTasks = new HashSet<Task>();
@Override
- public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+ public void enrollAttempt(TaskAttemptStatusOld status, long timestamp) {
startTimes.put(status.id,timestamp);
}
@@ -142,7 +142,7 @@ abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator {
}
@Override
- public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+ public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
TezTaskAttemptID attemptID = status.id;
TezTaskID taskID = attemptID.getTaskID();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f9713195/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
index 965d965..a68dc50 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
@@ -20,18 +20,18 @@ package org.apache.tez.dag.app.speculate;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
public interface TaskRuntimeEstimator {
- public void enrollAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+ public void enrollAttempt(TaskAttemptStatusOld reportedStatus, long timestamp);
public long attemptEnrolledTime(TezTaskAttemptID attemptID);
- public void updateAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+ public void updateAttempt(TaskAttemptStatusOld reportedStatus, long timestamp);
public void contextualize(Configuration conf, AppContext context);