You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/01 07:50:58 UTC
git commit: TEZ-1002. Generate Container Stop history events.
Contributed by Gopal V.
Repository: incubator-tez
Updated Branches:
refs/heads/master 9f04ae4d7 -> 2275c4ee1
TEZ-1002. Generate Container Stop history events. Contributed by Gopal
V.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2275c4ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2275c4ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2275c4ee
Branch: refs/heads/master
Commit: 2275c4ee14c9830e868e6f6fa9773381a9a69760
Parents: 9f04ae4
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 30 22:50:13 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Apr 30 22:50:13 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/app/RecoveryParser.java | 4 +
.../dag/app/rm/container/AMContainerImpl.java | 25 +++
.../tez/dag/history/HistoryEventType.java | 1 +
.../history/events/ContainerStoppedEvent.java | 171 +++++++++++++++++++
.../tez/dag/history/utils/ATSConstants.java | 1 +
tez-dag/src/main/proto/HistoryEvents.proto | 8 +
.../dag/app/rm/container/TestAMContainer.java | 32 ++++
.../TestHistoryEventsProtoConversion.java | 21 +++
8 files changed, 263 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 093069c..bd3fa58 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -44,6 +44,7 @@ import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
@@ -156,6 +157,9 @@ public class RecoveryParser {
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent();
break;
+ case CONTAINER_STOPPED:
+ event = new ContainerStoppedEvent();
+ break;
case VERTEX_INITIALIZED:
event = new VertexInitializedEvent();
break;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 8fb090f..6e6fa99 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
@@ -54,6 +56,9 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
@@ -673,6 +678,9 @@ public class AMContainerImpl implements AMContainer {
LOG.info("Container " + container.getContainerId()
+ " exited with diagnostics set to " + diag);
}
+ container.logStopped(event.isPreempted() ?
+ ContainerExitStatus.PREEMPTED
+ : ContainerExitStatus.SUCCESS);
}
public String getMessage(AMContainerImpl container,
@@ -694,6 +702,9 @@ public class AMContainerImpl implements AMContainer {
getMessage(container, cEvent));
}
container.unregisterFromTAListener();
+ container.logStopped(container.pendingAttempt == null ?
+ ContainerExitStatus.SUCCESS
+ : ContainerExitStatus.INVALID);
container.sendStopRequestToNM();
}
@@ -736,6 +747,7 @@ public class AMContainerImpl implements AMContainer {
container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node failure");
}
+ container.logStopped(ContainerExitStatus.ABORTED);
}
}
@@ -760,6 +772,7 @@ public class AMContainerImpl implements AMContainer {
" hit an invalid transition - " + cEvent.getType() + " at " +
container.getState());
}
+ container.logStopped(ContainerExitStatus.ABORTED);
container.sendStopRequestToNM();
container.unregisterFromTAListener();
}
@@ -1087,6 +1100,7 @@ public class AMContainerImpl implements AMContainer {
this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
this.registerFailedAttempt(event.getTaskAttemptId());
LOG.warn(errorMessage);
+ this.logStopped(ContainerExitStatus.INVALID);
this.sendStopRequestToNM();
this.unregisterFromTAListener();
this.unregisterFromContainerListener();
@@ -1096,6 +1110,17 @@ public class AMContainerImpl implements AMContainer {
failedAssignments.add(taId);
}
+ private void logStopped(int exitStatus) {
+ final Clock clock = appContext.getClock();
+ final HistoryEventHandler historyHandler = appContext.getHistoryHandler();
+ ContainerStoppedEvent lEvt = new ContainerStoppedEvent(containerId,
+ clock.getTime(),
+ exitStatus,
+ appContext.getApplicationAttemptId());
+ historyHandler.handle(
+ new DAGHistoryEvent(appContext.getCurrentDAGID(),lEvt));
+ }
+
protected void deAllocate() {
sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 219bfe3..fd747e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -34,6 +34,7 @@ public enum HistoryEventType {
TASK_ATTEMPT_STARTED,
TASK_ATTEMPT_FINISHED,
CONTAINER_LAUNCHED,
+ CONTAINER_STOPPED,
VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
DAG_COMMIT_STARTED,
VERTEX_COMMIT_STARTED,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
new file mode 100644
index 0000000..a544354
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.ats.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerStoppedProto;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class ContainerStoppedEvent implements HistoryEvent {
+
+ private ContainerId containerId;
+ private long stopTime;
+ private int exitStatus;
+ private ApplicationAttemptId applicationAttemptId;
+
+ public ContainerStoppedEvent() {
+ }
+
+ public ContainerStoppedEvent(ContainerId containerId,
+ long stopTime,
+ int exitStatus,
+ ApplicationAttemptId applicationAttemptId) {
+ this.containerId = containerId;
+ this.stopTime = stopTime;
+ this.exitStatus = exitStatus;
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.CONTAINER_STOPPED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // structure is identical to ContainerLaunchedEvent
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + containerId.toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_CONTAINER_ID.name());
+
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ applicationAttemptId.toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, containerId.toString());
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(containerEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject stopEvent = new JSONObject();
+ stopEvent.put(ATSConstants.TIMESTAMP, stopTime);
+ stopEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.CONTAINER_STOPPED.name());
+ events.put(stopEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // TODO add other container info here? or assume AHS will have this?
+ // TODO container logs?
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.EXIT_STATUS, exitStatus);
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return false;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return true;
+ }
+
+ public ContainerStoppedProto toProto() {
+ return ContainerStoppedProto.newBuilder()
+ .setApplicationAttemptId(applicationAttemptId.toString())
+ .setContainerId(containerId.toString())
+ .setStopTime(stopTime)
+ .setExitStatus(exitStatus)
+ .build();
+ }
+
+ public void fromProto(ContainerStoppedProto proto) {
+ this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+ stopTime = proto.getStopTime();
+ exitStatus = proto.getExitStatus();
+ this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
+ proto.getApplicationAttemptId());
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ ContainerStoppedProto proto =
+ ContainerStoppedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "containerId=" + containerId
+ + ", stoppedTime=" + stopTime
+ + ", exitStatus=" + exitStatus;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public long getStoppedTime() {
+ return stopTime;
+ }
+
+ public int getExitStatus() {
+ return exitStatus;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
index 4050df3..f53cc7d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
@@ -62,6 +62,7 @@ public class ATSConstants {
public static final String PROCESSOR_CLASS_NAME = "processorClassName";
public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
public static final String COMPLETED_LOGS_URL = "completedLogsURL";
+ public static final String EXIT_STATUS = "exitStatus";
/* Counters-related keys */
public static final String COUNTER_GROUPS = "counterGroups";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index c73870a..654a2fa 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -39,6 +39,14 @@ message ContainerLaunchedProto {
optional string container_id = 2;
optional int64 launch_time = 3;
}
+
+message ContainerStoppedProto {
+ optional string application_attempt_id = 1;
+ optional string container_id = 2;
+ optional int64 stop_time = 3;
+ optional int32 exit_status = 4;
+}
+
message DAGSubmittedProto {
optional string dag_id = 1;
optional DAGPlan dag_plan = 2;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 6bd8fcc..8084f0d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.app.AppContext;
@@ -72,6 +75,8 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -135,6 +140,7 @@ public class TestAMContainer {
verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -190,6 +196,7 @@ public class TestAMContainer {
verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -223,6 +230,7 @@ public class TestAMContainer {
wc.verifyNoOutgoingEvents();
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -261,6 +269,7 @@ public class TestAMContainer {
AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -299,6 +308,7 @@ public class TestAMContainer {
wc.nmStopSent();
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -338,6 +348,7 @@ public class TestAMContainer {
wc.nmStopSent();
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -376,6 +387,7 @@ public class TestAMContainer {
wc.nmStopSent();
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -411,6 +423,7 @@ public class TestAMContainer {
// TODO Should this be an RM DE-ALLOCATE instead ?
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED);
@@ -523,6 +536,7 @@ public class TestAMContainer {
// can cause it to be genreated)
wc.pullTaskToRun();
wc.verifyNoOutgoingEvents();
+ wc.verifyHistoryStopEvent();
assertFalse(wc.amContainer.isInErrorState());
}
@@ -558,6 +572,7 @@ public class TestAMContainer {
// can cause it to be genreated)
wc.taskAttemptSucceeded(wc.taskAttemptID);
wc.verifyNoOutgoingEvents();
+ wc.verifyHistoryStopEvent();
assertFalse(wc.amContainer.isInErrorState());
}
@@ -593,6 +608,7 @@ public class TestAMContainer {
// can cause it to be genreated)
wc.taskAttemptSucceeded(wc.taskAttemptID);
wc.verifyNoOutgoingEvents();
+ wc.verifyHistoryStopEvent();
assertFalse(wc.amContainer.isInErrorState());
}
@@ -622,6 +638,7 @@ public class TestAMContainer {
TaskAttemptEventContainerTerminated ctEvent =
(TaskAttemptEventContainerTerminated) outgoingEvents.get(0);
assertEquals(taID2, ctEvent.getTaskAttemptID());
+ wc.verifyHistoryStopEvent();
// Allocation to a completed Container is considered an error.
// TODO Is this valid ?
@@ -668,6 +685,7 @@ public class TestAMContainer {
}
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED);
@@ -714,6 +732,7 @@ public class TestAMContainer {
wc.containerCompleted(false);
wc.verifyNoOutgoingEvents();
+ wc.verifyHistoryStopEvent();
assertNull(wc.amContainer.getRunningTaskAttempt());
assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
@@ -755,6 +774,7 @@ public class TestAMContainer {
}
wc.containerCompleted(false);
+ wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED);
@@ -820,6 +840,7 @@ public class TestAMContainer {
wc.containerCompleted(false);
wc.verifyNoOutgoingEvents();
+ wc.verifyHistoryStopEvent();
}
@Test
@@ -995,6 +1016,8 @@ public class TestAMContainer {
EventHandler eventHandler;
AppContext appContext;
+
+ HistoryEventHandler historyEventHandler;
TezDAGID dagID;
TezVertexID vertexID;
@@ -1028,6 +1051,7 @@ public class TestAMContainer {
taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
eventHandler = mock(EventHandler.class);
+ historyEventHandler = mock(HistoryEventHandler.class);
appContext = mock(AppContext.class);
doReturn(new HashMap<ApplicationAccessType, String>()).when(appContext)
@@ -1035,6 +1059,8 @@ public class TestAMContainer {
doReturn(eventHandler).when(appContext).getEventHandler();
doReturn(appAttemptID).when(appContext).getApplicationAttemptId();
doReturn(applicationID).when(appContext).getApplicationID();
+ doReturn(new SystemClock()).when(appContext).getClock();
+ doReturn(historyEventHandler).when(appContext).getHistoryHandler();
mockDAGID();
taskSpec = mock(TaskSpec.class);
@@ -1074,6 +1100,12 @@ public class TestAMContainer {
verify(eventHandler, times(invocations)).handle(args.capture());
return args.getAllValues();
}
+
+ public void verifyHistoryStopEvent() {
+ ArgumentCaptor<DAGHistoryEvent> args = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+ verify(historyEventHandler, times(1)).handle(args.capture());
+ assertEquals(1, args.getAllValues().size());
+ }
public void launchContainer() {
launchContainer(new HashMap<String, LocalResource>(), new Credentials());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2275c4ee/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index e533ffd..b22b162 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.RuntimeUtils;
@@ -480,6 +481,23 @@ public class TestHistoryEventsProtoConversion {
logEvents(event, deserializedEvent);
}
+ private void testContainerStoppedEvent() throws Exception {
+ ContainerStoppedEvent event = new ContainerStoppedEvent(
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1001), 100034566,
+ ContainerExitStatus.SUCCESS, ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1));
+ ContainerStoppedEvent deserializedEvent = (ContainerStoppedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getContainerId(),
+ deserializedEvent.getContainerId());
+ Assert.assertEquals(event.getStoppedTime(),
+ deserializedEvent.getStoppedTime());
+ Assert.assertEquals(event.getApplicationAttemptId(),
+ deserializedEvent.getApplicationAttemptId());
+ logEvents(event, deserializedEvent);
+ }
+
private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
VertexDataMovementEventsGeneratedEvent event;
try {
@@ -617,6 +635,9 @@ public class TestHistoryEventsProtoConversion {
case CONTAINER_LAUNCHED:
testContainerLaunchedEvent();
break;
+ case CONTAINER_STOPPED:
+ testContainerStoppedEvent();
+ break;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
testVertexDataMovementEventsGeneratedEvent();
break;