You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/03/09 00:28:09 UTC
tez git commit: TEZ-2863. Container, node,
and logs not available in UI for tasks that fail to launch (jeagles)
Repository: tez
Updated Branches:
refs/heads/master eda9a47ea -> e8269c270
TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e8269c27
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e8269c27
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e8269c27
Branch: refs/heads/master
Commit: e8269c27077c1709cd614d97338b5ad8d035f507
Parents: eda9a47
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Mar 8 17:26:23 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Mar 8 17:27:40 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../TaskAttemptEventContainerTerminated.java | 11 ++-
...AttemptEventContainerTerminatedBySystem.java | 12 ++-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 89 +++++++++++++++-----
.../dag/app/rm/container/AMContainerImpl.java | 4 +-
.../events/TaskAttemptFinishedEvent.java | 63 +++++++++++++-
.../impl/HistoryEventJsonConversion.java | 16 ++++
tez-dag/src/main/proto/HistoryEvents.proto | 3 +
.../apache/tez/dag/app/TestRecoveryParser.java | 2 +-
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 14 +--
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 8 +-
.../TestHistoryEventsProtoConversion.java | 24 +++++-
.../impl/TestHistoryEventJsonConversion.java | 3 +-
.../ats/HistoryEventTimelineConversion.java | 16 ++++
.../ats/TestHistoryEventTimelineConversion.java | 12 ++-
15 files changed, 234 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 02232de..d01c732 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
+ TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch
TEZ-3140. Reduce AM memory usage during serialization
TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle exception.
TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
@@ -399,6 +400,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES:
+ TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch
TEZ-3140. Reduce AM memory usage during serialization
TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
TEZ-3115. Shuffle string handling adds significant memory overhead
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
index 5dd0141..3db2ffc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
@@ -17,22 +17,29 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
+ private final ContainerId containerId;
private final String message;
private final TaskAttemptTerminationCause errorCause;
- public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message,
- TaskAttemptTerminationCause errCause) {
+ public TaskAttemptEventContainerTerminated(ContainerId containerId, TezTaskAttemptID id,
+ String message, TaskAttemptTerminationCause errCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ this.containerId = containerId;
this.message = message;
this.errorCause = errCause;
}
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
@Override
public String getDiagnosticInfo() {
return message;
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
index a3c57e4..4efbf88 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
@@ -18,21 +18,29 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminatedBySystem extends TaskAttemptEvent
implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
+ private final ContainerId containerId;
private final String diagnostics;
private final TaskAttemptTerminationCause errorCause;
- public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics,
- TaskAttemptTerminationCause errorCause) {
+
+ public TaskAttemptEventContainerTerminatedBySystem(ContainerId containerId, TezTaskAttemptID id,
+ String diagnostics, TaskAttemptTerminationCause errorCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ this.containerId = containerId;
this.diagnostics = diagnostics;
this.errorCause = errorCause;
}
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
@Override
public String getDiagnosticInfo() {
return diagnostics;
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 0affff2..1598f2d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -78,6 +78,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
@@ -1037,23 +1038,8 @@ public class TaskAttemptImpl implements TaskAttempt,
protected void logJobHistoryAttemptStarted() {
Preconditions.checkArgument(recoveryData == null);
- final String containerIdStr = containerId.toString();
- String inProgressLogsUrl = nodeHttpAddress
- + "/" + "node/containerlogs"
- + "/" + containerIdStr
- + "/" + this.appContext.getUser();
- String completedLogsUrl = "";
- if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
- && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
- String contextStr = "v_" + getVertex().getName()
- + "_" + this.attemptId.toString();
- completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
- + "/" + containerNodeId.toString()
- + "/" + containerIdStr
- + "/" + contextStr
- + "/" + this.appContext.getUser();
- }
+ String inProgressLogsUrl = getInProgressLogsUrl();
+ String completedLogsUrl = getCompletedLogsUrl();
TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
attemptId, getVertex().getName(),
launchTime, containerId, containerNodeId,
@@ -1072,7 +1058,8 @@ public class TaskAttemptImpl implements TaskAttempt,
attemptId, getVertex().getName(), getLaunchTime(),
getFinishTime(), TaskAttemptState.SUCCEEDED, null,
"", getCounters(), lastDataEvents, taGeneratedEvents,
- creationTime, creationCausalTA, allocationTime);
+ creationTime, creationCausalTA, allocationTime,
+ null, null, null, null, null);
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1084,8 +1071,16 @@ public class TaskAttemptImpl implements TaskAttempt,
|| recoveryData.getTaskAttemptFinishedEvent() == null,
"log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent");
long finishTime = getFinishTime();
+ ContainerId unsuccessfulContainerId = null;
+ NodeId unsuccessfulContainerNodeId = null;
+ String inProgressLogsUrl = null;
+ String completedLogsUrl = null;
if (finishTime <= 0) {
finishTime = clock.getTime(); // comes here in case it was terminated before launch
+ unsuccessfulContainerId = containerId;
+ unsuccessfulContainerNodeId = containerNodeId;
+ inProgressLogsUrl = getInProgressLogsUrl();
+ completedLogsUrl = getCompletedLogsUrl();
}
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getVertex().getName(), getLaunchTime(),
@@ -1093,12 +1088,44 @@ public class TaskAttemptImpl implements TaskAttempt,
terminationCause,
StringUtils.join(
getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents,
- taGeneratedEvents, creationTime, creationCausalTA, allocationTime);
+ taGeneratedEvents, creationTime, creationCausalTA, allocationTime,
+ unsuccessfulContainerId, unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
}
+ private String getInProgressLogsUrl() {
+ String inProgressLogsUrl = null;
+ if (containerId != null && nodeHttpAddress != null) {
+ final String containerIdStr = containerId.toString();
+ inProgressLogsUrl = nodeHttpAddress
+ + "/" + "node/containerlogs"
+ + "/" + containerIdStr
+ + "/" + this.appContext.getUser();
+ }
+ return inProgressLogsUrl;
+ }
+
+ private String getCompletedLogsUrl() {
+ String completedLogsUrl = null;
+ if (containerId != null && containerNodeId != null && nodeHttpAddress != null) {
+ final String containerIdStr = containerId.toString();
+ if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
+ && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
+ String contextStr = "v_" + getVertex().getName()
+ + "_" + this.attemptId.toString();
+ completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
+ + "/" + containerNodeId.toString()
+ + "/" + containerIdStr
+ + "/" + contextStr
+ + "/" + this.appContext.getUser();
+ }
+ }
+ return completedLogsUrl;
+ }
+
//////////////////////////////////////////////////////////////////////////////
// Start of Transition Classes //
//////////////////////////////////////////////////////////////////////////////
@@ -1268,6 +1295,30 @@ public class TaskAttemptImpl implements TaskAttempt,
+ ", eventClass=" + event.getClass().getName());
}
+ if (event instanceof TaskAttemptEventContainerTerminated) {
+ TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event;
+ AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId());
+ Container container = amContainer.getContainer();
+
+ ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
+ ta.container = container;
+ ta.containerId = tEvent.getContainerId();
+ ta.containerNodeId = container.getNodeId();
+ ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
+ }
+
+ if (event instanceof TaskAttemptEventContainerTerminatedBySystem) {
+ TaskAttemptEventContainerTerminatedBySystem tEvent = (TaskAttemptEventContainerTerminatedBySystem) event;
+ AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId());
+ Container container = amContainer.getContainer();
+
+ ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
+ ta.container = container;
+ ta.containerId = tEvent.getContainerId();
+ ta.containerNodeId = container.getNodeId();
+ ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
+ }
+
if (ta.recoveryData == null ||
ta.recoveryData.getTaskAttemptFinishedEvent() == null) {
ta.setFinishTime();
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index e4302aa..94c8fe0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -1100,12 +1100,12 @@ public class AMContainerImpl implements AMContainer {
protected void sendTerminatedToTaskAttempt(
TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errCause) {
- sendEvent(new TaskAttemptEventContainerTerminated(taId, message, errCause));
+ sendEvent(new TaskAttemptEventContainerTerminated(containerId, taId, message, errCause));
}
protected void sendContainerTerminatedBySystemToTaskAttempt(
TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errorCause) {
- sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message, errorCause));
+ sendEvent(new TaskAttemptEventContainerTerminatedBySystem(containerId, taId, message, errorCause));
}
protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 21b8719..8e31a25 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -59,7 +62,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private TaskAttemptTerminationCause error;
private List<DataEventDependencyInfo> dataEvents;
private List<TezEvent> taGeneratedEvents;
-
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private String inProgressLogsUrl;
+ private String completedLogsUrl;
+ private String nodeHttpAddress;
+
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
long startTime,
@@ -71,7 +79,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
List<TezEvent> taGeneratedEvents,
long creationTime,
TezTaskAttemptID creationCausalTA,
- long allocationTime) {
+ long allocationTime,
+ ContainerId containerId,
+ NodeId nodeId,
+ String inProgressLogsUrl,
+ String completedLogsUrl,
+ String nodeHttpAddress) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.creationCausalTA = creationCausalTA;
@@ -85,6 +98,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.error = error;
this.dataEvents = dataEvents;
this.taGeneratedEvents = taGeneratedEvents;
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.inProgressLogsUrl = inProgressLogsUrl;
+ this.completedLogsUrl = completedLogsUrl;
+ this.nodeHttpAddress = nodeHttpAddress;
}
public TaskAttemptFinishedEvent() {
@@ -140,6 +158,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
builder.addTaGeneratedEvents(TezEventUtils.toProto(event));
}
}
+ if (containerId != null) {
+ builder.setContainerId(containerId.toString());
+ }
+ if (nodeId != null) {
+ builder.setNodeId(nodeId.toString());
+ }
+ if (nodeHttpAddress != null) {
+ builder.setNodeHttpAddress(nodeHttpAddress);
+ }
return builder.build();
}
@@ -175,6 +202,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
this.taGeneratedEvents.add(TezEventUtils.fromProto(eventProto));
}
}
+ if (proto.hasContainerId()) {
+ this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+ }
+ if (proto.hasNodeId()) {
+ this.nodeId = ConverterUtils.toNodeId(proto.getNodeId());
+ }
+ if (proto.hasNodeHttpAddress()) {
+ this.nodeHttpAddress = proto.getNodeHttpAddress();
+ }
}
@Override
@@ -210,6 +246,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", status=" + state.name()
+ ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
+ + ", containerId=" + (containerId != null ? containerId.toString() : "")
+ + ", nodeId=" + (nodeId != null ? nodeId.toString() : "")
+ + ", nodeHttpAddress=" + (nodeHttpAddress != null ? nodeHttpAddress : "")
+ counterStr;
}
@@ -256,4 +295,24 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
public List<TezEvent> getTAGeneratedEvents() {
return taGeneratedEvents;
}
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public String getInProgressLogsUrl() {
+ return inProgressLogsUrl;
+ }
+
+ public String getCompletedLogsUrl() {
+ return completedLogsUrl;
+ }
+
+ public String getNodeHttpAddress() {
+ return nodeHttpAddress;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index c4e7e5b..9bca440 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -555,6 +555,22 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.LAST_DATA_EVENTS,
DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents()));
}
+ if (event.getNodeId() != null) {
+ otherInfo.put(ATSConstants.NODE_ID, event.getNodeId().toString());
+ }
+ if (event.getContainerId() != null) {
+ otherInfo.put(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+ }
+ if (event.getInProgressLogsUrl() != null) {
+ otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+ }
+ if (event.getCompletedLogsUrl() != null) {
+ otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+ }
+ if (event.getNodeHttpAddress() != null) {
+ otherInfo.put(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+ }
+
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index b9e4507..f3aeed4 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -190,6 +190,9 @@ message TaskAttemptFinishedProto {
optional string error_enum = 10;
repeated DataEventDependencyInfoProto data_events = 11;
repeated TezEventProto ta_generated_events = 12;
+ optional string container_id = 13;
+ optional string node_id = 14;
+ optional string node_http_address = 15;
}
message EventMetaDataProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index d8b620a..12e75a7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -703,7 +703,7 @@ public class TestRecoveryParser {
TaskAttemptFinishedEvent ta0t2v2FinishedEvent = new TaskAttemptFinishedEvent(
ta0t2v2Id, "v1", 500L, 600L,
TaskAttemptState.SUCCEEDED, null, "", null,
- null, null, 0L, null, 0L);
+ null, null, 0L, null, 0L, null, null, null, null, null);
rService.handle(new DAGHistoryEvent(dagID, ta0t2v2FinishedEvent));
rService.stop();
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 6be682d..0a2613c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -883,7 +883,7 @@ public class TestDAGRecovery {
TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
ta1t1v1Id, "v1", 0L, 0L,
TaskAttemptState.SUCCEEDED, null, "", null,
- null, taGeneratedEvents, 0L, null, 0L);
+ null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
new HashMap<TezTaskAttemptID, TaskAttemptRecoveryData>();
@@ -941,7 +941,7 @@ public class TestDAGRecovery {
TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime,
TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null,
- null, null, 0L, null, 0L);
+ null, null, 0L, null, 0L, null, null, null, null, null);
TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
@@ -970,7 +970,7 @@ public class TestDAGRecovery {
TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime,
TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null,
- null, null, 0L, null, 0L);
+ null, null, 0L, null, 0L, null, null, null, null, null);
TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
@@ -1030,7 +1030,7 @@ public class TestDAGRecovery {
TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime,
TaskAttemptState.SUCCEEDED, null, "", null,
- null, taGeneratedEvents, 0L, null, 0L);
+ null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
@@ -1068,7 +1068,7 @@ public class TestDAGRecovery {
TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
ta1t1v2Id, "vertex2", ta1LaunchTime, ta1FinishedTime,
TaskAttemptState.SUCCEEDED, null, "", null,
- null, taGeneratedEvents, 0L, null, 0L);
+ null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v2Id);
Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
@@ -1119,7 +1119,7 @@ public class TestDAGRecovery {
TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime,
TaskAttemptState.FAILED, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null,
- null, null, 0L, null, 0L);
+ null, null, 0L, null, 0L, null, null, null, null, null);
TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
@@ -1150,7 +1150,7 @@ public class TestDAGRecovery {
TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
ta1t1v1Id, "v1", ta1FinishedTime, ta1FinishedTime,
TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null,
- null, null, 0L, null, 0L);
+ null, null, 0L, null, 0L, null, null, null, null, null);
TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3bb688e..c5dfbc1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -420,7 +420,7 @@ public class TestTaskAttempt {
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
- taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
+ taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID,
"Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
// verify unregister is not invoked again
verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
@@ -487,7 +487,7 @@ public class TestTaskAttempt {
TaskAttemptState.RUNNING);
verify(mockHeartbeatHandler).register(taskAttemptID);
- taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated",
+ taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "Terminated",
TaskAttemptTerminationCause.CONTAINER_EXITED));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
@@ -576,7 +576,7 @@ public class TestTaskAttempt {
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
- taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
+ taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID,
"Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
// verify unregister is not invoked again
verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
@@ -747,7 +747,7 @@ public class TestTaskAttempt {
assertEquals("0", taImpl.getDiagnostics().get(0));
assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
- taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "1",
+ taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "1",
TaskAttemptTerminationCause.CONTAINER_EXITED));
// verify unregister is not invoked again
verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index d3bd7b8..38d9935 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -502,7 +502,11 @@ public class TestHistoryEventsProtoConversion {
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
null, null, null, null, null, 2048,
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024);
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024,
+ ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
+ "host1", 19999), "inProgress", "Completed", "nodeHttpAddress");
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -523,6 +527,12 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getState());
Assert.assertEquals(event.getCounters(),
deserializedEvent.getCounters());
+ Assert.assertEquals(event.getContainerId(),
+ deserializedEvent.getContainerId());
+ Assert.assertEquals(event.getNodeId(),
+ deserializedEvent.getNodeId());
+ Assert.assertEquals(event.getNodeHttpAddress(),
+ deserializedEvent.getNodeHttpAddress());
logEvents(event, deserializedEvent);
}
{
@@ -537,7 +547,11 @@ public class TestHistoryEventsProtoConversion {
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events,
- null, 0, null, 0);
+ null, 0, null, 0,
+ ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
+ "host1", 19999), "inProgress", "Completed", "nodeHttpAddress");
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -550,6 +564,12 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getState());
Assert.assertEquals(event.getCounters(),
deserializedEvent.getCounters());
+ Assert.assertEquals(event.getContainerId(),
+ deserializedEvent.getContainerId());
+ Assert.assertEquals(event.getNodeId(),
+ deserializedEvent.getNodeId());
+ Assert.assertEquals(event.getNodeHttpAddress(),
+ deserializedEvent.getNodeHttpAddress());
Assert.assertEquals(event.getTaskAttemptError(),
deserializedEvent.getTaskAttemptError());
Assert.assertEquals(events.size(), event.getDataEvents().size());
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index b285196..ea683f7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -165,7 +165,8 @@ public class TestHistoryEventJsonConversion {
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,
- null, null, null, null, 0, null, 0);
+ null, null, null, null, 0, null, 0,
+ containerId, nodeId, null, null, "nodeHttpAddress");
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index d6b518b..26d4d98 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -468,6 +468,22 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENTS,
DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents()));
}
+ if (event.getNodeId() != null) {
+ atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString());
+ }
+ if (event.getContainerId() != null) {
+ atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+ }
+ if (event.getInProgressLogsUrl() != null) {
+ atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+ }
+ if (event.getCompletedLogsUrl() != null) {
+ atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+ }
+ if (event.getNodeHttpAddress() != null) {
+ atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+ }
+
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 49b6f9f..c5badaa 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -184,7 +184,8 @@ public class TestHistoryEventTimelineConversion {
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST,
- null, null, null, null, 0, null, 0);
+ null, null, null, null, 0, null, 0,
+ containerId, nodeId, null, null, "nodeHttpAddress");
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -519,7 +520,7 @@ public class TestHistoryEventTimelineConversion {
TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
startTime, finishTime, state, error, diagnostics, counters, events, null, creationTime,
- tezTaskAttemptID, allocationTime);
+ tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -542,7 +543,7 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(finishTime, evt.getTimestamp());
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
- Assert.assertEquals(11, otherInfo.size());
+ Assert.assertEquals(16, otherInfo.size());
Assert.assertEquals(tezTaskAttemptID.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT));
Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME));
@@ -559,6 +560,11 @@ public class TestHistoryEventTimelineConversion {
Map<String, Object> obj3 = (Map<String, Object>) obj2.get(0);
Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP));
Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
+ Assert.assertEquals("inProgressURL", otherInfo.get(ATSConstants.IN_PROGRESS_LOGS_URL));
+ Assert.assertEquals("logsURL", otherInfo.get(ATSConstants.COMPLETED_LOGS_URL));
+ Assert.assertEquals(nodeId.toString(), otherInfo.get(ATSConstants.NODE_ID));
+ Assert.assertEquals(containerId.toString(), otherInfo.get(ATSConstants.CONTAINER_ID));
+ Assert.assertEquals("nodeHttpAddress", otherInfo.get(ATSConstants.NODE_HTTP_ADDRESS));
}
@SuppressWarnings("unchecked")