You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/08/14 02:54:02 UTC
tez git commit: TEZ-2701. Add time at which container was allocated
to attempt (bikas)
Repository: tez
Updated Branches:
refs/heads/master b8e8bcbd0 -> 6b67b0bc1
TEZ-2701. Add time at which container was allocated to attempt (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6b67b0bc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6b67b0bc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6b67b0bc
Branch: refs/heads/master
Commit: 6b67b0bc1eb010f6dc8af2936ae738909e1244ff
Parents: b8e8bcb
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Aug 13 17:53:52 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Aug 13 17:53:52 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 5 +-
.../org/apache/tez/common/ATSConstants.java | 4 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 52 ++++++++++++++++----
.../tez/dag/app/rm/container/AMContainer.java | 1 +
.../dag/app/rm/container/AMContainerImpl.java | 12 +++++
.../history/events/TaskAttemptStartedEvent.java | 41 +++++++++------
.../impl/HistoryEventJsonConversion.java | 7 +--
tez-dag/src/main/proto/HistoryEvents.proto | 5 +-
.../app/dag/impl/TestTaskAttemptRecovery.java | 12 +++--
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 12 ++---
.../dag/app/rm/container/TestAMContainer.java | 5 +-
.../TestHistoryEventsProtoConversion.java | 12 +++--
.../impl/TestHistoryEventJsonConversion.java | 2 +-
.../parser/datamodel/TaskAttemptInfo.java | 37 +++++++++-----
.../apache/tez/history/TestHistoryParser.java | 17 ++++---
.../ats/HistoryEventTimelineConversion.java | 9 ++--
.../ats/TestHistoryEventTimelineConversion.java | 12 +++--
17 files changed, 167 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 432c82b..bbe9321 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,14 +8,15 @@ INCOMPATIBLE CHANGES
TEZ-2048. Remove VertexManagerPluginContext.getTaskContainer()
TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead.
TEZ-2468. Change the minimum Java version to Java 7.
+
+ALL CHANGES:
TEZ-2646. Add scheduling casual dependency for attempts
TEZ-2647. Add input causality dependency for attempts
TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
instead of tasks
TEZ-2650. Timing details on Vertex state changes
TEZ-2699. Internalize strings in ATF parser
-
-ALL CHANGES:
+ TEZ-2701. Add time at which container was allocated to attempt
TEZ-2683. TestHttpConnection::testAsyncHttpConnectionInterrupt fails in certain environments.
TEZ-2692. bugfixes & enhancements related to job parser and analyzer.
TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 1568b96..4566a91 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -57,6 +57,8 @@ public class ATSConstants {
public static final String VERTEX_NAME = "vertexName";
public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping";
public static final String SCHEDULED_TIME = "scheduledTime";
+ public static final String CREATION_TIME = "creationTime";
+ public static final String ALLOCATION_TIME = "allocationTime";
public static final String INIT_REQUESTED_TIME = "initRequestedTime";
public static final String INIT_TIME = "initTime";
public static final String START_REQUESTED_TIME = "startRequestedTime";
@@ -84,7 +86,7 @@ public class ATSConstants {
public static final String LAST_DATA_EVENT_TIME = "lastDataEventTime";
public static final String LAST_DATA_EVENT_SOURCE_TA = "lastDataEventSourceTA";
public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
- public static final String SCHEDULING_CAUSAL_ATTEMPT = "schedulingCausalAttempt";
+ public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt";
/* Counters-related keys */
public static final String COUNTER_GROUPS = "counterGroups";
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index ebf7c58..e5a6f84 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -86,6 +86,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
+import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -138,6 +139,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// TODO Can these be replaced by the container object TEZ-1037
private Container container;
+ private long allocationTime;
private ContainerId containerId;
private NodeId containerNodeId;
private String nodeHttpAddress;
@@ -170,8 +172,8 @@ public class TaskAttemptImpl implements TaskAttempt,
private final ContainerContext containerContext;
private final boolean leafVertex;
- private TezTaskAttemptID schedulingCausalTA;
- private long scheduledTime;
+ private TezTaskAttemptID creationCausalTA;
+ private long creationTime;
protected static final FailedTransitionHelper FAILED_HELPER =
new FailedTransitionHelper();
@@ -411,8 +413,8 @@ public class TaskAttemptImpl implements TaskAttempt,
this.appContext = appContext;
this.task = task;
this.vertex = this.task.getVertex();
- this.schedulingCausalTA = schedulingCausalTA;
- this.scheduledTime = clock.getTime();
+ this.creationCausalTA = schedulingCausalTA;
+ this.creationTime = clock.getTime();
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
initTaskAttemptStatus(reportedStatus);
@@ -446,7 +448,7 @@ public class TaskAttemptImpl implements TaskAttempt,
}
public TezTaskAttemptID getSchedulingCausalTA() {
- return schedulingCausalTA;
+ return creationCausalTA;
}
TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
@@ -646,6 +648,33 @@ public class TaskAttemptImpl implements TaskAttempt,
}
}
+ public long getCreationTime() {
+ readLock.lock();
+ try {
+ return creationTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public TezTaskAttemptID getCreationCausalAttempt() {
+ readLock.lock();
+ try {
+ return creationCausalTA;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public long getAllocationTime() {
+ readLock.lock();
+ try {
+ return allocationTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@Override
public long getFinishTime() {
readLock.lock();
@@ -739,8 +768,9 @@ public class TaskAttemptImpl implements TaskAttempt,
{
TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
this.launchTime = tEvent.getStartTime();
- this.scheduledTime = tEvent.getScheduledTime();
- this.schedulingCausalTA = tEvent.getSchedulingCausalTA();
+ this.creationTime = tEvent.getCreationTime();
+ this.allocationTime = tEvent.getAllocationTime();
+ this.creationCausalTA = tEvent.getCreationCausalTA();
recoveryStartEventSeen = true;
recoveredState = TaskAttemptState.RUNNING;
this.containerId = tEvent.getContainerId();
@@ -963,7 +993,8 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
attemptId, getVertex().getName(),
launchTime, containerId, containerNodeId,
- inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, scheduledTime, schedulingCausalTA);
+ inProgressLogsUrl, completedLogsUrl, nodeHttpAddress, creationTime, creationCausalTA,
+ allocationTime);
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), startEvt));
}
@@ -1114,9 +1145,10 @@ public class TaskAttemptImpl implements TaskAttempt,
public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent;
- Container container = ta.appContext.getAllContainers()
- .get(event.getContainerId()).getContainer();
+ AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId());
+ Container container = amContainer.getContainer();
+ ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
ta.container = container;
ta.containerId = event.getContainerId();
ta.containerNodeId = container.getNodeId();
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index a6b403d..7d6da8a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -32,5 +32,6 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
public Container getContainer();
public List<TezTaskAttemptID> getAllTaskAttempts();
public TezTaskAttemptID getCurrentTaskAttempt();
+ public long getCurrentTaskAttemptAllocationTime();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 330f2b7..9b90752 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -95,6 +95,7 @@ public class AMContainerImpl implements AMContainer {
private boolean nodeFailed = false;
private TezTaskAttemptID currentAttempt;
+ private long currentAttemptAllocationTime;
private List<TezTaskAttemptID> failedAssignments;
private boolean inError = false;
@@ -362,6 +363,16 @@ public class AMContainerImpl implements AMContainer {
}
}
+ @Override
+ public long getCurrentTaskAttemptAllocationTime() {
+ readLock.lock();
+ try {
+ return this.currentAttemptAllocationTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
public boolean isInErrorState() {
return inError;
}
@@ -532,6 +543,7 @@ public class AMContainerImpl implements AMContainer {
// Register the additional resources back for this container.
container.containerLocalResources.putAll(container.additionalLocalResources);
container.currentAttempt = event.getTaskAttemptId();
+ container.currentAttemptAllocationTime = container.appContext.getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec());
LOG.debug("AdditionalLocalResources: " + container.additionalLocalResources);
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 8eb074d..4d15fb9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -40,14 +40,15 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
private ContainerId containerId;
private NodeId nodeId;
private String nodeHttpAddress;
- private TezTaskAttemptID schedulingCausalTA;
- private long scheduledTime;
+ private TezTaskAttemptID creationCausalTA;
+ private long creationTime;
+ private long allocationTime;
public TaskAttemptStartedEvent(TezTaskAttemptID taId,
String vertexName, long launchTime,
ContainerId containerId, NodeId nodeId,
String inProgressLogsUrl, String completedLogsUrl,
- String nodeHttpAddress, long scheduledTime, TezTaskAttemptID schedulingCausalTA) {
+ String nodeHttpAddress, long creationTime, TezTaskAttemptID creationCausalTA, long allocationTime) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.launchTime = launchTime;
@@ -56,8 +57,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
this.inProgressLogsUrl = inProgressLogsUrl;
this.completedLogsUrl = completedLogsUrl;
this.nodeHttpAddress = nodeHttpAddress;
- this.scheduledTime = scheduledTime;
- this.schedulingCausalTA = schedulingCausalTA;
+ this.creationTime = creationTime;
+ this.creationCausalTA = creationCausalTA;
+ this.allocationTime = allocationTime;
}
public TaskAttemptStartedEvent() {
@@ -84,9 +86,10 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
.setStartTime(launchTime)
.setContainerId(containerId.toString())
.setNodeId(nodeId.toString())
- .setScheduledTime(scheduledTime);
- if (schedulingCausalTA != null) {
- builder.setSchedulingCausalTA(schedulingCausalTA.toString());
+ .setCreationTime(creationTime)
+ .setAllocationTime(allocationTime);
+ if (creationCausalTA != null) {
+ builder.setCreationCausalTA(creationCausalTA.toString());
}
return builder.build();
}
@@ -96,9 +99,10 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
this.launchTime = proto.getStartTime();
this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
this.nodeId = ConverterUtils.toNodeId(proto.getNodeId());
- this.scheduledTime = proto.getScheduledTime();
- if (proto.hasSchedulingCausalTA()) {
- this.schedulingCausalTA = TezTaskAttemptID.fromString(proto.getSchedulingCausalTA());
+ this.creationTime = proto.getCreationTime();
+ this.allocationTime = proto.getAllocationTime();
+ if (proto.hasCreationCausalTA()) {
+ this.creationCausalTA = TezTaskAttemptID.fromString(proto.getCreationCausalTA());
}
}
@@ -120,7 +124,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
public String toString() {
return "vertexName=" + vertexName
+ ", taskAttemptId=" + taskAttemptId
- + ", scheduledTime=" + scheduledTime
+ + ", creationTime=" + creationTime
+ + ", allocationTime=" + allocationTime
+ ", startTime=" + launchTime
+ ", containerId=" + containerId
+ ", nodeId=" + nodeId
@@ -136,12 +141,16 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
return launchTime;
}
- public long getScheduledTime() {
- return scheduledTime;
+ public long getCreationTime() {
+ return creationTime;
}
- public TezTaskAttemptID getSchedulingCausalTA() {
- return schedulingCausalTA;
+ public long getAllocationTime() {
+ return allocationTime;
+ }
+
+ public TezTaskAttemptID getCreationCausalTA() {
+ return creationCausalTA;
}
public ContainerId getContainerId() {
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 528da10..b32b324 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -577,9 +577,10 @@ public class HistoryEventJsonConversion {
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
- otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
- if (event.getSchedulingCausalTA() != null) {
- otherInfo.put(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT, event.getSchedulingCausalTA().toString());
+ otherInfo.put(ATSConstants.CREATION_TIME, event.getCreationTime());
+ otherInfo.put(ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
+ if (event.getCreationCausalTA() != null) {
+ otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT, event.getCreationCausalTA().toString());
}
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index ffb382e..e268e0d 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -164,8 +164,9 @@ message TaskAttemptStartedProto {
optional int64 start_time = 2;
optional string container_id = 3;
optional string node_id = 4;
- optional int64 scheduled_time = 5;
- optional string scheduling_causal_t_a = 6;
+ optional int64 creation_time = 5;
+ optional string creation_causal_t_a = 6;
+ optional int64 allocation_time = 7;
}
message TaskAttemptFinishedProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 920109b..4a797e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -19,7 +19,6 @@
package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -71,7 +70,9 @@ public class TestTaskAttemptRecovery {
private TaskAttemptImpl ta;
private EventHandler mockEventHandler;
- private long startTime = System.currentTimeMillis();
+ private long creationTime = System.currentTimeMillis();
+ private long allocationTime = creationTime + 5000;
+ private long startTime = allocationTime + 5000;
private long finishTime = startTime + 5000;
private TezTaskAttemptID taId;
@@ -153,9 +154,14 @@ public class TestTaskAttemptRecovery {
}
private void restoreFromTAStartEvent() {
+ TezTaskAttemptID causalId = TezTaskAttemptID.getInstance(taId.getTaskID(), taId.getId()+1);
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
- startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+ startTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", creationTime, causalId,
+ allocationTime));
+ assertEquals(causalId, ta.getCreationCausalAttempt());
+ assertEquals(creationTime, ta.getCreationTime());
+ assertEquals(allocationTime, ta.getAllocationTime());
assertEquals(startTime, ta.getLaunchTime());
assertEquals(TaskAttemptState.RUNNING, recoveredState);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 87e7498..1d22e06 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -215,7 +215,7 @@ public class TestTaskRecovery {
long taStartTime = taskStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
- taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+ taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(0, task.getFinishedAttemptsCount());
assertEquals(taskScheduledTime, task.scheduledTime);
@@ -721,7 +721,7 @@ public class TestTaskRecovery {
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
- taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+ taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(TaskAttemptStateInternal.NEW,
((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -774,7 +774,7 @@ public class TestTaskRecovery {
for (int i = 0; i < maxFailedAttempts; ++i) {
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
- mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+ mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
0, TaskAttemptState.KILLED, null, "", null, 0, null));
}
@@ -804,7 +804,7 @@ public class TestTaskRecovery {
for (int i = 0; i < maxFailedAttempts; ++i) {
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
- mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+ mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
0, TaskAttemptState.FAILED, null, "", null, 0, null));
}
@@ -834,7 +834,7 @@ public class TestTaskRecovery {
for (int i = 0; i < maxFailedAttempts - 1; ++i) {
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
- mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+ mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
0, TaskAttemptState.FAILED, null, "", null, 0, null));
}
@@ -844,7 +844,7 @@ public class TestTaskRecovery {
TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId());
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId,
- vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null));
+ vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(TaskAttemptStateInternal.NEW,
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fafbba6..f9a1c5e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -108,11 +108,14 @@ public class TestAMContainer {
assertNull(wc.amContainer.getCurrentTaskAttempt());
// Assign task.
+ long currTime = wc.appContext.getClock().getTime();
wc.assignTaskAttempt(wc.taskAttemptID);
wc.verifyState(AMContainerState.LAUNCHING);
wc.verifyNoOutgoingEvents();
assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt());
-
+ assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() > 0);
+ assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() >= currTime);
+
// Container Launched
wc.containerLaunched();
wc.verifyState(AMContainerState.RUNNING);
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index a32cc27..3507d99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -480,7 +480,7 @@ public class TestHistoryEventsProtoConversion {
ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
"host1", 19999), "inProgress", "Completed", "nodeHttpAddress", 1024,
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0)
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024
);
TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent)
testProtoConversion(event);
@@ -492,10 +492,12 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getNodeId());
Assert.assertEquals(event.getStartTime(),
deserializedEvent.getStartTime());
- Assert.assertEquals(event.getScheduledTime(),
- deserializedEvent.getScheduledTime());
- Assert.assertEquals(event.getSchedulingCausalTA(),
- deserializedEvent.getSchedulingCausalTA());
+ Assert.assertEquals(event.getCreationTime(),
+ deserializedEvent.getCreationTime());
+ Assert.assertEquals(event.getAllocationTime(),
+ deserializedEvent.getAllocationTime());
+ Assert.assertEquals(event.getCreationCausalTA(),
+ deserializedEvent.getCreationCausalTA());
logEvents(event, deserializedEvent);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index ec1603e..9c11dc7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -160,7 +160,7 @@ public class TestHistoryEventJsonConversion {
break;
case TASK_ATTEMPT_STARTED:
event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
- nodeId, null, null, "nodeHttpAddress", 0, null);
+ nodeId, null, null, "nodeHttpAddress", 0, null, 0);
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index 916df95..ba676a2 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -43,14 +43,16 @@ public class TaskAttemptInfo extends BaseInfo {
private final long endTime;
private final String diagnostics;
- private final long scheduledTime;
+ private final long creationTime;
+ private final long allocationTime;
private final String containerId;
private final String nodeId;
private final String status;
private final String logUrl;
- private final String schedulingCausalTA;
+ private final String creationCausalTA;
private final long lastDataEventTime;
private final String lastDataEventSourceTA;
+ private final String terminationCause;
private TaskInfo taskInfo;
@@ -70,10 +72,10 @@ public class TaskAttemptInfo extends BaseInfo {
startTime = otherInfoNode.optLong(Constants.START_TIME);
endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
- scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME);
- schedulingCausalTA = StringInterner.weakIntern(
- otherInfoNode.optString(Constants.SCHEDULING_CAUSAL_ATTEMPT));
-
+ creationTime = otherInfoNode.optLong(Constants.CREATION_TIME);
+ creationCausalTA = StringInterner.weakIntern(
+ otherInfoNode.optString(Constants.CREATION_CAUSAL_ATTEMPT));
+ allocationTime = otherInfoNode.optLong(Constants.ALLOCATION_TIME);
containerId = StringInterner.weakIntern(otherInfoNode.optString(Constants.CONTAINER_ID));
String id = otherInfoNode.optString(Constants.NODE_ID);
nodeId = StringInterner.weakIntern((id != null) ? (id.split(":")[0]) : "");
@@ -84,6 +86,8 @@ public class TaskAttemptInfo extends BaseInfo {
lastDataEventTime = otherInfoNode.optLong(ATSConstants.LAST_DATA_EVENT_TIME);
lastDataEventSourceTA = StringInterner.weakIntern(
otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA));
+ terminationCause = StringInterner
+ .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
}
void setTaskInfo(TaskInfo taskInfo) {
@@ -110,8 +114,8 @@ public class TaskAttemptInfo extends BaseInfo {
return endTime;
}
- public final long getScheduledTime() {
- return scheduledTime;
+ public final long getCreationTime() {
+ return creationTime;
}
public final long getLastDataEventTime() {
@@ -126,19 +130,26 @@ public class TaskAttemptInfo extends BaseInfo {
return getFinishTimeInterval() - getStartTimeInterval();
}
- public final long getScheduledTimeInterval() {
- return scheduledTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
+ public final long getCreationTimeInterval() {
+ return creationTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
}
- public final String getSchedulingCausalTA() {
- return schedulingCausalTA;
+ public final String getCreationCausalTA() {
+ return creationCausalTA;
}
+ public final long getAllocationTime() {
+ return allocationTime;
+ }
@Override
public final String getDiagnostics() {
return diagnostics;
}
+
+ public final String getTerminationCause() {
+ return terminationCause;
+ }
public static TaskAttemptInfo create(JSONObject taskInfoObject) throws JSONException {
return new TaskAttemptInfo(taskInfoObject);
@@ -254,7 +265,7 @@ public class TaskAttemptInfo extends BaseInfo {
StringBuilder sb = new StringBuilder();
sb.append("[");
sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", ");
- sb.append("scheduledTime=").append(getScheduledTimeInterval()).append(", ");
+ sb.append("creationTime=").append(getCreationTimeInterval()).append(", ");
sb.append("startTime=").append(getStartTimeInterval()).append(", ");
sb.append("finishTime=").append(getFinishTimeInterval()).append(", ");
sb.append("timeTaken=").append(getTimeTaken()).append(", ");
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index c89acb2..2b797a5 100644
--- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -77,6 +77,7 @@ import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.tests.MiniTezClusterWithTimeline;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -297,8 +298,9 @@ public class TestHistoryParser {
}
}
for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
- assertTrue(attemptInfo.getStartTimeInterval() > 0);
- assertTrue(attemptInfo.getScheduledTimeInterval() > 0);
+ assertTrue(attemptInfo.getCreationTime() > 0);
+ assertTrue(attemptInfo.getAllocationTime() > 0);
+ assertTrue(attemptInfo.getStartTime() > 0);
}
}
assertTrue(vertexInfo.getLastTaskToFinish() != null);
@@ -389,13 +391,14 @@ public class TestHistoryParser {
"TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
for (TaskInfo taskInfo : summationVertex.getTasks()) {
- String lastAttemptId = null;
+ TaskAttemptInfo lastAttempt = null;
for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
- if (lastAttemptId != null) {
+ if (lastAttempt != null) {
// failed attempt should be causal TA of next attempt
- assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
+ assertTrue(lastAttempt.getTaskAttemptId().equals(attemptInfo.getCreationCausalTA()));
+ assertTrue(lastAttempt.getTerminationCause() != null);
}
- lastAttemptId = attemptInfo.getTaskAttemptId();
+ lastAttempt = attemptInfo;
}
}
@@ -769,6 +772,8 @@ public class TestHistoryParser {
.equals(TaskAttemptState.SUCCEEDED)) {
assertTrue(attemptInfo.getStartTimeInterval() > 0);
assertTrue(attemptInfo.getFinishTimeInterval() > 0);
+ assertTrue(attemptInfo.getCreationTime() > 0);
+ assertTrue(attemptInfo.getAllocationTime() > 0);
assertTrue(attemptInfo.getStartTime() > 0);
assertTrue(attemptInfo.getFinishTime() > 0);
assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index eaed115..b979402 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -471,10 +471,11 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
- atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
- if (event.getSchedulingCausalTA() != null) {
- atsEntity.addOtherInfo(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT,
- event.getSchedulingCausalTA().toString());
+ atsEntity.addOtherInfo(ATSConstants.CREATION_TIME, event.getCreationTime());
+ atsEntity.addOtherInfo(ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
+ if (event.getCreationCausalTA() != null) {
+ atsEntity.addOtherInfo(ATSConstants.CREATION_CAUSAL_ATTEMPT,
+ event.getCreationCausalTA().toString());
}
return atsEntity;
http://git-wip-us.apache.org/repos/asf/tez/blob/6b67b0bc/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 838d9d6..75828c3 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -165,7 +165,7 @@ public class TestHistoryEventTimelineConversion {
break;
case TASK_ATTEMPT_STARTED:
event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
- nodeId, null, null, "nodeHttpAddress", 0, null);
+ nodeId, null, null, "nodeHttpAddress", 0, null, 0);
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
@@ -727,10 +727,11 @@ public class TestHistoryEventTimelineConversion {
@Test(timeout = 5000)
public void testConvertTaskAttemptStartedEvent() {
long startTime = random.nextLong();
- long scheduleTime = 1024;
+ long creationTime = 1024;
+ long allocationTime = 1024;
TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress",
- scheduleTime, tezTaskAttemptID);
+ creationTime, tezTaskAttemptID, allocationTime);
TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -776,8 +777,9 @@ public class TestHistoryEventTimelineConversion {
Assert.assertTrue(TaskAttemptState.RUNNING.name()
.equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
Assert.assertEquals(tezTaskAttemptID.toString(),
- timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULING_CAUSAL_ATTEMPT));
- Assert.assertEquals(scheduleTime, timelineEntity.getOtherInfo().get(ATSConstants.SCHEDULED_TIME));
+ timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT));
+ Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME));
+ Assert.assertEquals(allocationTime, timelineEntity.getOtherInfo().get(ATSConstants.ALLOCATION_TIME));
}
@Test(timeout = 5000)