You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/01 07:50:58 UTC

git commit: TEZ-1002. Generate Container Stop history events. Contributed by Gopal V.

Repository: incubator-tez
Updated Branches:
  refs/heads/master 9f04ae4d7 -> 2275c4ee1


TEZ-1002. Generate Container Stop history events. Contributed by Gopal
V.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2275c4ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2275c4ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2275c4ee

Branch: refs/heads/master
Commit: 2275c4ee14c9830e868e6f6fa9773381a9a69760
Parents: 9f04ae4
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 30 22:50:13 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Apr 30 22:50:13 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/RecoveryParser.java  |   4 +
 .../dag/app/rm/container/AMContainerImpl.java   |  25 +++
 .../tez/dag/history/HistoryEventType.java       |   1 +
 .../history/events/ContainerStoppedEvent.java   | 171 +++++++++++++++++++
 .../tez/dag/history/utils/ATSConstants.java     |   1 +
 tez-dag/src/main/proto/HistoryEvents.proto      |   8 +
 .../dag/app/rm/container/TestAMContainer.java   |  32 ++++
 .../TestHistoryEventsProtoConversion.java       |  21 +++
 8 files changed, 263 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 093069c..bd3fa58 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -44,6 +44,7 @@ import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
 import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
@@ -156,6 +157,9 @@ public class RecoveryParser {
       case CONTAINER_LAUNCHED:
         event = new ContainerLaunchedEvent();
         break;
+      case CONTAINER_STOPPED:
+        event = new ContainerStoppedEvent();
+        break;
       case VERTEX_INITIALIZED:
         event = new VertexInitializedEvent();
         break;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 8fb090f..6e6fa99 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;
@@ -54,6 +56,9 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
 import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 //import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
@@ -673,6 +678,9 @@ public class AMContainerImpl implements AMContainer {
         LOG.info("Container " + container.getContainerId()
             + " exited with diagnostics set to " + diag);
       }
+      container.logStopped(event.isPreempted() ?
+            ContainerExitStatus.PREEMPTED
+          : ContainerExitStatus.SUCCESS);
     }
 
     public String getMessage(AMContainerImpl container,
@@ -694,6 +702,9 @@ public class AMContainerImpl implements AMContainer {
             getMessage(container, cEvent));
       }
       container.unregisterFromTAListener();
+      container.logStopped(container.pendingAttempt == null ? 
+          ContainerExitStatus.SUCCESS 
+          : ContainerExitStatus.INVALID);
       container.sendStopRequestToNM();
     }
 
@@ -736,6 +747,7 @@ public class AMContainerImpl implements AMContainer {
         container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
         container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node failure");
       }
+      container.logStopped(ContainerExitStatus.ABORTED);
     }
   }
 
@@ -760,6 +772,7 @@ public class AMContainerImpl implements AMContainer {
                 " hit an invalid transition - " + cEvent.getType() + " at " +
                 container.getState());
       }
+      container.logStopped(ContainerExitStatus.ABORTED);
       container.sendStopRequestToNM();
       container.unregisterFromTAListener();
     }
@@ -1087,6 +1100,7 @@ public class AMContainerImpl implements AMContainer {
     this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
     this.registerFailedAttempt(event.getTaskAttemptId());
     LOG.warn(errorMessage);
+    this.logStopped(ContainerExitStatus.INVALID);
     this.sendStopRequestToNM();
     this.unregisterFromTAListener();
     this.unregisterFromContainerListener();
@@ -1096,6 +1110,17 @@ public class AMContainerImpl implements AMContainer {
     failedAssignments.add(taId);
   }
 
+  private void logStopped(int exitStatus) {
+    final Clock clock = appContext.getClock();
+    final HistoryEventHandler historyHandler = appContext.getHistoryHandler();
+    ContainerStoppedEvent lEvt = new ContainerStoppedEvent(containerId,
+        clock.getTime(), 
+        exitStatus, 
+        appContext.getApplicationAttemptId());
+    historyHandler.handle(
+        new DAGHistoryEvent(appContext.getCurrentDAGID(),lEvt));
+  }
+  
   protected void deAllocate() {
     sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 219bfe3..fd747e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -34,6 +34,7 @@ public enum HistoryEventType {
   TASK_ATTEMPT_STARTED,
   TASK_ATTEMPT_FINISHED,
   CONTAINER_LAUNCHED,
+  CONTAINER_STOPPED,
   VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
   DAG_COMMIT_STARTED,
   VERTEX_COMMIT_STARTED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
new file mode 100644
index 0000000..a544354
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.ats.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerStoppedProto;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class ContainerStoppedEvent implements HistoryEvent {
+
+  private ContainerId containerId;
+  private long stopTime;
+  private int exitStatus;
+  private ApplicationAttemptId applicationAttemptId;
+
+  public ContainerStoppedEvent() {
+  }
+  
+  public ContainerStoppedEvent(ContainerId containerId,
+      long stopTime,
+      int exitStatus,
+      ApplicationAttemptId applicationAttemptId) {
+    this.containerId = containerId;
+    this.stopTime = stopTime;
+    this.exitStatus = exitStatus;
+    this.applicationAttemptId = applicationAttemptId;
+  }
+  
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.CONTAINER_STOPPED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // structure is identical to ContainerLaunchedEvent
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + containerId.toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_CONTAINER_ID.name());
+
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        applicationAttemptId.toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, containerId.toString());
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(containerEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject stopEvent = new JSONObject();
+    stopEvent.put(ATSConstants.TIMESTAMP, stopTime);
+    stopEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.CONTAINER_STOPPED.name());
+    events.put(stopEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+    
+    // TODO add other container info here? or assume AHS will have this?
+    // TODO container logs?
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.EXIT_STATUS, exitStatus);
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+    
+    return jsonObject;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return false;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return true;
+  }
+
+  public ContainerStoppedProto toProto() {
+    return ContainerStoppedProto.newBuilder()
+        .setApplicationAttemptId(applicationAttemptId.toString())
+        .setContainerId(containerId.toString())
+        .setStopTime(stopTime)
+        .setExitStatus(exitStatus)
+        .build();
+  }
+
+  public void fromProto(ContainerStoppedProto proto) {
+    this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+    stopTime = proto.getStopTime();
+    exitStatus = proto.getExitStatus();
+    this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
+        proto.getApplicationAttemptId());
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    ContainerStoppedProto proto =
+        ContainerStoppedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "containerId=" + containerId
+        + ", stoppedTime=" + stopTime 
+        + ", exitStatus=" + exitStatus;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public long getStoppedTime() {
+    return stopTime;
+  }
+  
+  public int getExitStatus() {
+    return exitStatus;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
index 4050df3..f53cc7d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
@@ -62,6 +62,7 @@ public class ATSConstants {
   public static final String PROCESSOR_CLASS_NAME = "processorClassName";
   public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
   public static final String COMPLETED_LOGS_URL = "completedLogsURL";
+  public static final String EXIT_STATUS = "exitStatus";
 
   /* Counters-related keys */
   public static final String COUNTER_GROUPS = "counterGroups";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index c73870a..654a2fa 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -39,6 +39,14 @@ message ContainerLaunchedProto {
   optional string container_id = 2;
   optional int64 launch_time = 3;
 }
+
+message ContainerStoppedProto {
+  optional string application_attempt_id = 1;
+  optional string container_id = 2;
+  optional int64 stop_time = 3;
+  optional int32 exit_status  = 4;
+}
+
 message DAGSubmittedProto {
   optional string dag_id = 1;
   optional DAGPlan dag_plan = 2;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 6bd8fcc..8084f0d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.app.AppContext;
@@ -72,6 +75,8 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -135,6 +140,7 @@ public class TestAMContainer {
     verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
 
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -190,6 +196,7 @@ public class TestAMContainer {
     verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
 
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -223,6 +230,7 @@ public class TestAMContainer {
     wc.verifyNoOutgoingEvents();
 
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -261,6 +269,7 @@ public class TestAMContainer {
         AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
 
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -299,6 +308,7 @@ public class TestAMContainer {
 
     wc.nmStopSent();
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -338,6 +348,7 @@ public class TestAMContainer {
 
     wc.nmStopSent();
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -376,6 +387,7 @@ public class TestAMContainer {
 
     wc.nmStopSent();
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -411,6 +423,7 @@ public class TestAMContainer {
     // TODO Should this be an RM DE-ALLOCATE instead ?
 
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
@@ -523,6 +536,7 @@ public class TestAMContainer {
     // can cause it to be genreated)
     wc.pullTaskToRun();
     wc.verifyNoOutgoingEvents();
+    wc.verifyHistoryStopEvent();
 
     assertFalse(wc.amContainer.isInErrorState());
   }
@@ -558,6 +572,7 @@ public class TestAMContainer {
     // can cause it to be genreated)
     wc.taskAttemptSucceeded(wc.taskAttemptID);
     wc.verifyNoOutgoingEvents();
+    wc.verifyHistoryStopEvent();
 
     assertFalse(wc.amContainer.isInErrorState());
   }
@@ -593,6 +608,7 @@ public class TestAMContainer {
     // can cause it to be genreated)
     wc.taskAttemptSucceeded(wc.taskAttemptID);
     wc.verifyNoOutgoingEvents();
+    wc.verifyHistoryStopEvent();
 
     assertFalse(wc.amContainer.isInErrorState());
   }
@@ -622,6 +638,7 @@ public class TestAMContainer {
     TaskAttemptEventContainerTerminated ctEvent =
         (TaskAttemptEventContainerTerminated) outgoingEvents.get(0);
     assertEquals(taID2, ctEvent.getTaskAttemptID());
+    wc.verifyHistoryStopEvent();
 
     // Allocation to a completed Container is considered an error.
     // TODO Is this valid ?
@@ -668,6 +685,7 @@ public class TestAMContainer {
     }
 
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
@@ -714,6 +732,7 @@ public class TestAMContainer {
 
     wc.containerCompleted(false);
     wc.verifyNoOutgoingEvents();
+    wc.verifyHistoryStopEvent();
 
     assertNull(wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
@@ -755,6 +774,7 @@ public class TestAMContainer {
     }
 
     wc.containerCompleted(false);
+    wc.verifyHistoryStopEvent();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
@@ -820,6 +840,7 @@ public class TestAMContainer {
 
     wc.containerCompleted(false);
     wc.verifyNoOutgoingEvents();
+    wc.verifyHistoryStopEvent();
   }
   
   @Test
@@ -995,6 +1016,8 @@ public class TestAMContainer {
     EventHandler eventHandler;
 
     AppContext appContext;
+    
+    HistoryEventHandler historyEventHandler;
 
     TezDAGID dagID;
     TezVertexID vertexID;
@@ -1028,6 +1051,7 @@ public class TestAMContainer {
       taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
       
       eventHandler = mock(EventHandler.class);
+      historyEventHandler = mock(HistoryEventHandler.class);
 
       appContext = mock(AppContext.class);
       doReturn(new HashMap<ApplicationAccessType, String>()).when(appContext)
@@ -1035,6 +1059,8 @@ public class TestAMContainer {
       doReturn(eventHandler).when(appContext).getEventHandler();
       doReturn(appAttemptID).when(appContext).getApplicationAttemptId();
       doReturn(applicationID).when(appContext).getApplicationID();
+      doReturn(new SystemClock()).when(appContext).getClock();
+      doReturn(historyEventHandler).when(appContext).getHistoryHandler();
       mockDAGID();
 
       taskSpec = mock(TaskSpec.class);
@@ -1074,6 +1100,12 @@ public class TestAMContainer {
       verify(eventHandler, times(invocations)).handle(args.capture());
       return args.getAllValues();
     }
+    
+    public void verifyHistoryStopEvent() {
+      ArgumentCaptor<DAGHistoryEvent> args = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+      verify(historyEventHandler, times(1)).handle(args.capture());
+      assertEquals(1, args.getAllValues().size());
+    }
 
     public void launchContainer() {
       launchContainer(new HashMap<String, LocalResource>(), new Credentials());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index e533ffd..b22b162 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.RuntimeUtils;
@@ -480,6 +481,23 @@ public class TestHistoryEventsProtoConversion {
     logEvents(event, deserializedEvent);
   }
 
+  private void testContainerStoppedEvent() throws Exception {
+    ContainerStoppedEvent event = new ContainerStoppedEvent(
+        ContainerId.newInstance(ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1), 1001), 100034566,
+        ContainerExitStatus.SUCCESS, ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1));
+    ContainerStoppedEvent deserializedEvent = (ContainerStoppedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getContainerId(),
+        deserializedEvent.getContainerId());
+    Assert.assertEquals(event.getStoppedTime(),
+        deserializedEvent.getStoppedTime());
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    logEvents(event, deserializedEvent);
+  }
+
   private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
     VertexDataMovementEventsGeneratedEvent event;
     try {
@@ -617,6 +635,9 @@ public class TestHistoryEventsProtoConversion {
         case CONTAINER_LAUNCHED:
           testContainerLaunchedEvent();
           break;
+        case CONTAINER_STOPPED:
+          testContainerStoppedEvent();
+          break;
         case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
           testVertexDataMovementEventsGeneratedEvent();
           break;