You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/03/09 00:28:09 UTC

tez git commit: TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch (jeagles)

Repository: tez
Updated Branches:
  refs/heads/master eda9a47ea -> e8269c270


TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e8269c27
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e8269c27
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e8269c27

Branch: refs/heads/master
Commit: e8269c27077c1709cd614d97338b5ad8d035f507
Parents: eda9a47
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Mar 8 17:26:23 2016 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Mar 8 17:27:40 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../TaskAttemptEventContainerTerminated.java    | 11 ++-
 ...AttemptEventContainerTerminatedBySystem.java | 12 ++-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 89 +++++++++++++++-----
 .../dag/app/rm/container/AMContainerImpl.java   |  4 +-
 .../events/TaskAttemptFinishedEvent.java        | 63 +++++++++++++-
 .../impl/HistoryEventJsonConversion.java        | 16 ++++
 tez-dag/src/main/proto/HistoryEvents.proto      |  3 +
 .../apache/tez/dag/app/TestRecoveryParser.java  |  2 +-
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   | 14 +--
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  8 +-
 .../TestHistoryEventsProtoConversion.java       | 24 +++++-
 .../impl/TestHistoryEventJsonConversion.java    |  3 +-
 .../ats/HistoryEventTimelineConversion.java     | 16 ++++
 .../ats/TestHistoryEventTimelineConversion.java | 12 ++-
 15 files changed, 234 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 02232de..d01c732 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch
   TEZ-3140. Reduce AM memory usage during serialization
   TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle exception.
   TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
@@ -399,6 +400,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch
   TEZ-3140. Reduce AM memory usage during serialization
   TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.
   TEZ-3115. Shuffle string handling adds significant memory overhead

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
index 5dd0141..3db2ffc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
@@ -17,22 +17,29 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
     implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
+  private final ContainerId containerId;
   private final String message;
   private final TaskAttemptTerminationCause errorCause;
 
-  public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message, 
-      TaskAttemptTerminationCause errCause) {
+  public TaskAttemptEventContainerTerminated(ContainerId containerId, TezTaskAttemptID id,
+      String message, TaskAttemptTerminationCause errCause) {
     super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    this.containerId = containerId;
     this.message = message;
     this.errorCause = errCause;
   }
 
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
   @Override
   public String getDiagnosticInfo() {
     return message;

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
index a3c57e4..4efbf88 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
@@ -18,21 +18,29 @@
 
 package org.apache.tez.dag.app.dag.event;
 
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventContainerTerminatedBySystem extends TaskAttemptEvent 
   implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
 
+  private final ContainerId containerId;
   private final String diagnostics;
   private final TaskAttemptTerminationCause errorCause;
-  public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics,
-      TaskAttemptTerminationCause errorCause) {
+
+  public TaskAttemptEventContainerTerminatedBySystem(ContainerId containerId, TezTaskAttemptID id,
+      String diagnostics, TaskAttemptTerminationCause errorCause) {
     super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+    this.containerId = containerId;
     this.diagnostics = diagnostics;
     this.errorCause = errorCause;
   }
 
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
   @Override
   public String getDiagnosticInfo() {
     return diagnostics;

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 0affff2..1598f2d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -78,6 +78,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
@@ -1037,23 +1038,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   protected void logJobHistoryAttemptStarted() {
     Preconditions.checkArgument(recoveryData == null);
-    final String containerIdStr = containerId.toString();
-    String inProgressLogsUrl = nodeHttpAddress
-       + "/" + "node/containerlogs"
-       + "/" + containerIdStr
-       + "/" + this.appContext.getUser();
-    String completedLogsUrl = "";
-    if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
-        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
-        && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
-      String contextStr = "v_" + getVertex().getName()
-          + "_" + this.attemptId.toString();
-      completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
-          + "/" + containerNodeId.toString()
-          + "/" + containerIdStr
-          + "/" + contextStr
-          + "/" + this.appContext.getUser();
-    }
+    String inProgressLogsUrl = getInProgressLogsUrl();
+    String completedLogsUrl = getCompletedLogsUrl();
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
         attemptId, getVertex().getName(),
         launchTime, containerId, containerNodeId,
@@ -1072,7 +1058,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         attemptId, getVertex().getName(), getLaunchTime(),
         getFinishTime(), TaskAttemptState.SUCCEEDED, null,
         "", getCounters(), lastDataEvents, taGeneratedEvents,
-        creationTime, creationCausalTA, allocationTime);
+        creationTime, creationCausalTA, allocationTime,
+        null, null, null, null, null);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1084,8 +1071,16 @@ public class TaskAttemptImpl implements TaskAttempt,
         || recoveryData.getTaskAttemptFinishedEvent() == null,
         "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent");
     long finishTime = getFinishTime();
+    ContainerId unsuccessfulContainerId = null;
+    NodeId unsuccessfulContainerNodeId = null;
+    String inProgressLogsUrl = null;
+    String completedLogsUrl = null;
     if (finishTime <= 0) {
       finishTime = clock.getTime(); // comes here in case it was terminated before launch
+      unsuccessfulContainerId = containerId;
+      unsuccessfulContainerNodeId = containerNodeId;
+      inProgressLogsUrl = getInProgressLogsUrl();
+      completedLogsUrl = getCompletedLogsUrl();
     }
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getVertex().getName(), getLaunchTime(),
@@ -1093,12 +1088,44 @@ public class TaskAttemptImpl implements TaskAttempt,
         terminationCause,
         StringUtils.join(
             getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents,
-        taGeneratedEvents, creationTime, creationCausalTA, allocationTime);
+        taGeneratedEvents, creationTime, creationCausalTA, allocationTime,
+        unsuccessfulContainerId, unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
   }
 
+  private String getInProgressLogsUrl() {
+    String inProgressLogsUrl = null;
+    if (containerId != null && nodeHttpAddress != null) {
+      final String containerIdStr = containerId.toString();
+      inProgressLogsUrl = nodeHttpAddress
+          + "/" + "node/containerlogs"
+          + "/" + containerIdStr
+          + "/" + this.appContext.getUser();
+    }
+    return inProgressLogsUrl;
+  }
+
+  private String getCompletedLogsUrl() {
+    String completedLogsUrl = null;
+    if (containerId != null && containerNodeId != null && nodeHttpAddress != null) {
+      final String containerIdStr = containerId.toString();
+      if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
+          && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
+        String contextStr = "v_" + getVertex().getName()
+            + "_" + this.attemptId.toString();
+        completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
+            + "/" + containerNodeId.toString()
+            + "/" + containerIdStr
+            + "/" + contextStr
+            + "/" + this.appContext.getUser();
+      }
+    }
+    return completedLogsUrl;
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   //                   Start of Transition Classes                            //
   //////////////////////////////////////////////////////////////////////////////
@@ -1268,6 +1295,30 @@ public class TaskAttemptImpl implements TaskAttempt,
                 + ", eventClass=" + event.getClass().getName());
       }
 
+      if (event instanceof TaskAttemptEventContainerTerminated) {
+        TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event;
+        AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId());
+        Container container = amContainer.getContainer();
+
+        ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
+        ta.container = container;
+        ta.containerId = tEvent.getContainerId();
+        ta.containerNodeId = container.getNodeId();
+        ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
+      }
+
+      if (event instanceof TaskAttemptEventContainerTerminatedBySystem) {
+        TaskAttemptEventContainerTerminatedBySystem tEvent = (TaskAttemptEventContainerTerminatedBySystem) event;
+        AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId());
+        Container container = amContainer.getContainer();
+
+        ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
+        ta.container = container;
+        ta.containerId = tEvent.getContainerId();
+        ta.containerNodeId = container.getNodeId();
+        ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress());
+      }
+
       if (ta.recoveryData == null ||
           ta.recoveryData.getTaskAttemptFinishedEvent() == null) {
         ta.setFinishTime();

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index e4302aa..94c8fe0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -1100,12 +1100,12 @@ public class AMContainerImpl implements AMContainer {
 
   protected void sendTerminatedToTaskAttempt(
       TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errCause) {
-    sendEvent(new TaskAttemptEventContainerTerminated(taId, message, errCause));
+    sendEvent(new TaskAttemptEventContainerTerminated(containerId, taId, message, errCause));
   }
   
   protected void sendContainerTerminatedBySystemToTaskAttempt(
     TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errorCause) {
-      sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message, errorCause));
+      sendEvent(new TaskAttemptEventContainerTerminatedBySystem(containerId, taId, message, errorCause));
   }
 
   protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 21b8719..8e31a25 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -59,7 +62,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   private TaskAttemptTerminationCause error;
   private List<DataEventDependencyInfo> dataEvents;
   private List<TezEvent> taGeneratedEvents;
-  
+  private ContainerId containerId;
+  private NodeId nodeId;
+  private String inProgressLogsUrl;
+  private String completedLogsUrl;
+  private String nodeHttpAddress;
+
   public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
       String vertexName,
       long startTime,
@@ -71,7 +79,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
       List<TezEvent> taGeneratedEvents,
       long creationTime, 
       TezTaskAttemptID creationCausalTA, 
-      long allocationTime) {
+      long allocationTime,
+      ContainerId containerId,
+      NodeId nodeId,
+      String inProgressLogsUrl,
+      String completedLogsUrl,
+      String nodeHttpAddress) {
     this.taskAttemptId = taId;
     this.vertexName = vertexName;
     this.creationCausalTA = creationCausalTA;
@@ -85,6 +98,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     this.error = error;
     this.dataEvents = dataEvents;
     this.taGeneratedEvents = taGeneratedEvents;
+    this.containerId = containerId;
+    this.nodeId = nodeId;
+    this.inProgressLogsUrl = inProgressLogsUrl;
+    this.completedLogsUrl = completedLogsUrl;
+    this.nodeHttpAddress = nodeHttpAddress;
   }
 
   public TaskAttemptFinishedEvent() {
@@ -140,6 +158,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         builder.addTaGeneratedEvents(TezEventUtils.toProto(event));
       }
     }
+    if (containerId != null) {
+      builder.setContainerId(containerId.toString());
+    }
+    if (nodeId != null) {
+      builder.setNodeId(nodeId.toString());
+    }
+    if (nodeHttpAddress != null) {
+      builder.setNodeHttpAddress(nodeHttpAddress);
+    }
     return builder.build();
   }
 
@@ -175,6 +202,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         this.taGeneratedEvents.add(TezEventUtils.fromProto(eventProto));
       }
     }
+    if (proto.hasContainerId()) {
+      this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+    }
+    if (proto.hasNodeId()) {
+      this.nodeId = ConverterUtils.toNodeId(proto.getNodeId());
+    }
+    if (proto.hasNodeHttpAddress()) {
+      this.nodeHttpAddress = proto.getNodeHttpAddress();
+    }
   }
 
   @Override
@@ -210,6 +246,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", status=" + state.name()
         + ", errorEnum=" + (error != null ? error.name() : "")
         + ", diagnostics=" + diagnostics
+        + ", containerId=" + (containerId != null ? containerId.toString() : "")
+        + ", nodeId=" + (nodeId != null ? nodeId.toString() : "")
+        + ", nodeHttpAddress=" + (nodeHttpAddress != null ? nodeHttpAddress : "")
         + counterStr;
   }
 
@@ -256,4 +295,24 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   public List<TezEvent> getTAGeneratedEvents() {
     return taGeneratedEvents;
   }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public NodeId getNodeId() {
+    return nodeId;
+  }
+
+  public String getInProgressLogsUrl() {
+    return inProgressLogsUrl;
+  }
+
+  public String getCompletedLogsUrl() {
+    return completedLogsUrl;
+  }
+
+  public String getNodeHttpAddress() {
+    return nodeHttpAddress;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index c4e7e5b..9bca440 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -555,6 +555,22 @@ public class HistoryEventJsonConversion {
       otherInfo.put(ATSConstants.LAST_DATA_EVENTS, 
           DAGUtils.convertDataEventDependencyInfoToJSON(event.getDataEvents()));
     }
+    if (event.getNodeId() != null) {
+      otherInfo.put(ATSConstants.NODE_ID, event.getNodeId().toString());
+    }
+    if (event.getContainerId() != null) {
+      otherInfo.put(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    }
+    if (event.getInProgressLogsUrl() != null) {
+      otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    }
+    if (event.getCompletedLogsUrl() != null) {
+      otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    }
+    if (event.getNodeHttpAddress() != null) {
+      otherInfo.put(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+    }
+
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index b9e4507..f3aeed4 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -190,6 +190,9 @@ message TaskAttemptFinishedProto {
   optional string error_enum = 10;
   repeated DataEventDependencyInfoProto data_events = 11;
   repeated TezEventProto ta_generated_events = 12;
+  optional string container_id = 13;
+  optional string node_id = 14;
+  optional string node_http_address = 15;
 }
 
 message EventMetaDataProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index d8b620a..12e75a7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -703,7 +703,7 @@ public class TestRecoveryParser {
     TaskAttemptFinishedEvent ta0t2v2FinishedEvent = new TaskAttemptFinishedEvent(
         ta0t2v2Id, "v1", 500L, 600L, 
         TaskAttemptState.SUCCEEDED, null, "", null, 
-        null, null, 0L, null, 0L);
+        null, null, 0L, null, 0L, null, null, null, null, null);
     rService.handle(new DAGHistoryEvent(dagID, ta0t2v2FinishedEvent));
 
     rService.stop();

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 6be682d..0a2613c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -883,7 +883,7 @@ public class TestDAGRecovery {
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", 0L, 0L, 
         TaskAttemptState.SUCCEEDED, null, "", null, 
-        null, taGeneratedEvents, 0L, null, 0L);
+        null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
         new HashMap<TezTaskAttemptID, TaskAttemptRecoveryData>();
@@ -941,7 +941,7 @@ public class TestDAGRecovery {
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
         TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null, 
-        null, null, 0L, null, 0L);
+        null, null, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
     
@@ -970,7 +970,7 @@ public class TestDAGRecovery {
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
         TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, 
-        null, null, 0L, null, 0L);
+        null, null, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(null, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
     
@@ -1030,7 +1030,7 @@ public class TestDAGRecovery {
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
         TaskAttemptState.SUCCEEDED, null, "", null, 
-        null, taGeneratedEvents, 0L, null, 0L);
+        null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
 
@@ -1068,7 +1068,7 @@ public class TestDAGRecovery {
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v2Id, "vertex2", ta1LaunchTime, ta1FinishedTime, 
         TaskAttemptState.SUCCEEDED, null, "", null, 
-        null, taGeneratedEvents, 0L, null, 0L);
+        null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v2Id);   
     Map<TezTaskAttemptID, TaskAttemptRecoveryData> taRecoveryDataMap =
@@ -1119,7 +1119,7 @@ public class TestDAGRecovery {
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
         TaskAttemptState.FAILED, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null, 
-        null, null, 0L, null, 0L);
+        null, null, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
     
@@ -1150,7 +1150,7 @@ public class TestDAGRecovery {
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1FinishedTime, ta1FinishedTime, 
         TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, 
-        null, null, 0L, null, 0L);
+        null, null, 0L, null, 0L, null, null, null, null, null);
     TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
     doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
     

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3bb688e..c5dfbc1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -420,7 +420,7 @@ public class TestTaskAttempt {
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
+    taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID,
         "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
     // verify unregister is not invoked again
     verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
@@ -487,7 +487,7 @@ public class TestTaskAttempt {
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated",
+    taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "Terminated",
         TaskAttemptTerminationCause.CONTAINER_EXITED));
     assertFalse(
         "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
@@ -576,7 +576,7 @@ public class TestTaskAttempt {
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
 
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
+    taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID,
         "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
     // verify unregister is not invoked again
     verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
@@ -747,7 +747,7 @@ public class TestTaskAttempt {
     assertEquals("0", taImpl.getDiagnostics().get(0));
     assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
 
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "1",
+    taImpl.handle(new TaskAttemptEventContainerTerminated(contId, taskAttemptID, "1",
         TaskAttemptTerminationCause.CONTAINER_EXITED));
     // verify unregister is not invoked again
     verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index d3bd7b8..38d9935 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -502,7 +502,11 @@ public class TestHistoryEventsProtoConversion {
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
           null, null, null, null, null, 2048,
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024);
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024,
+          ContainerId.newInstance(
+                  ApplicationAttemptId.newInstance(
+                      ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
+                  "host1", 19999), "inProgress", "Completed", "nodeHttpAddress");
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -523,6 +527,12 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getState());
       Assert.assertEquals(event.getCounters(),
           deserializedEvent.getCounters());
+      Assert.assertEquals(event.getContainerId(),
+          deserializedEvent.getContainerId());
+      Assert.assertEquals(event.getNodeId(),
+          deserializedEvent.getNodeId());
+      Assert.assertEquals(event.getNodeHttpAddress(),
+          deserializedEvent.getNodeHttpAddress());
       logEvents(event, deserializedEvent);
     }
     {
@@ -537,7 +547,11 @@ public class TestHistoryEventsProtoConversion {
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
           TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events,
-          null, 0, null, 0);
+          null, 0, null, 0,
+          ContainerId.newInstance(
+              ApplicationAttemptId.newInstance(
+                  ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
+              "host1", 19999), "inProgress", "Completed", "nodeHttpAddress");
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -550,6 +564,12 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getState());
       Assert.assertEquals(event.getCounters(),
           deserializedEvent.getCounters());
+      Assert.assertEquals(event.getContainerId(),
+          deserializedEvent.getContainerId());
+      Assert.assertEquals(event.getNodeId(),
+          deserializedEvent.getNodeId());
+      Assert.assertEquals(event.getNodeHttpAddress(),
+          deserializedEvent.getNodeHttpAddress());
       Assert.assertEquals(event.getTaskAttemptError(),
           deserializedEvent.getTaskAttemptError());
       Assert.assertEquals(events.size(), event.getDataEvents().size());

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index b285196..ea683f7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -165,7 +165,8 @@ public class TestHistoryEventJsonConversion {
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
               random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,
-              null, null, null, null, 0, null, 0);
+              null, null, null, null, 0, null, 0,
+              containerId, nodeId, null, null, "nodeHttpAddress");
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index d6b518b..26d4d98 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -468,6 +468,22 @@ public class HistoryEventTimelineConversion {
       atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENTS, 
           DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents()));
     }
+    if (event.getNodeId() != null) {
+      atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString());
+    }
+    if (event.getContainerId() != null) {
+      atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    }
+    if (event.getInProgressLogsUrl() != null) {
+      atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    }
+    if (event.getCompletedLogsUrl() != null) {
+      atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    }
+    if (event.getNodeHttpAddress() != null) {
+      atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
+    }
+
     return atsEntity;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e8269c27/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 49b6f9f..c5badaa 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -184,7 +184,8 @@ public class TestHistoryEventTimelineConversion {
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
               random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST,
-              null, null, null, null, 0, null, 0);
+              null, null, null, null, 0, null, 0,
+              containerId, nodeId, null, null, "nodeHttpAddress");
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -519,7 +520,7 @@ public class TestHistoryEventTimelineConversion {
 
     TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
         startTime, finishTime, state, error, diagnostics, counters, events, null, creationTime,
-        tezTaskAttemptID, allocationTime);
+        tezTaskAttemptID, allocationTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -542,7 +543,7 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(finishTime, evt.getTimestamp());
 
     final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
-    Assert.assertEquals(11, otherInfo.size());
+    Assert.assertEquals(16, otherInfo.size());
     Assert.assertEquals(tezTaskAttemptID.toString(), 
         timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT));
     Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME));
@@ -559,6 +560,11 @@ public class TestHistoryEventTimelineConversion {
     Map<String, Object> obj3 = (Map<String, Object>) obj2.get(0);
     Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP));
     Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
+    Assert.assertEquals("inProgressURL", otherInfo.get(ATSConstants.IN_PROGRESS_LOGS_URL));
+    Assert.assertEquals("logsURL", otherInfo.get(ATSConstants.COMPLETED_LOGS_URL));
+    Assert.assertEquals(nodeId.toString(), otherInfo.get(ATSConstants.NODE_ID));
+    Assert.assertEquals(containerId.toString(), otherInfo.get(ATSConstants.CONTAINER_ID));
+    Assert.assertEquals("nodeHttpAddress", otherInfo.get(ATSConstants.NODE_HTTP_ADDRESS));
   }
 
   @SuppressWarnings("unchecked")