You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:48:06 UTC
[25/50] [abbrv] tez git commit: TEZ-3644. Cleanup container list
stored in AMNode. (sseth)
TEZ-3644. Cleanup container list stored in AMNode. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ce6ea6e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ce6ea6e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ce6ea6e
Branch: refs/heads/TEZ-1190
Commit: 4ce6ea6ed867a67600dbc36a2f56c37bbec3d708
Parents: 1f2a935
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 2 16:02:16 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 2 16:02:16 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/app/rm/container/AMContainerImpl.java | 62 +++++++--
.../org/apache/tez/dag/app/rm/node/AMNode.java | 3 +
.../rm/node/AMNodeEventContainerCompleted.java | 37 ++++++
.../tez/dag/app/rm/node/AMNodeEventType.java | 5 +-
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 67 ++++++++--
.../tez/dag/app/rm/node/AMNodeTracker.java | 5 +-
.../dag/app/rm/node/PerSourceNodeTracker.java | 11 +-
.../dag/app/rm/container/TestAMContainer.java | 128 ++++++++++++-------
.../tez/dag/app/rm/node/TestAMNodeTracker.java | 73 +++++++++++
10 files changed, 322 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b8465de..07841bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3644. Cleanup container list stored in AMNode.
TEZ-3646. IFile.Writer has an extra output stream flush call
TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances.
TEZ-3637. TezMerger logs too much at INFO level
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index ac429c7..18e72a7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -32,9 +32,12 @@ import org.apache.tez.Utils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.rm.node.AMNodeEventContainerCompleted;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.state.OnStateChangedCallback;
+import org.apache.tez.state.StateMachineTez;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.Credentials;
@@ -48,7 +51,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
-import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.dag.app.AppContext;
@@ -118,6 +120,8 @@ public class AMContainerImpl implements AMContainer {
private Credentials credentials;
private boolean credentialsChanged = false;
+
+ private boolean completedMessageSent = false;
// TODO Consider registering with the TAL, instead of the TAL pulling.
// Possibly after splitting TAL and ContainerListener.
@@ -127,8 +131,11 @@ public class AMContainerImpl implements AMContainer {
// TODO Create a generic ERROR state. Container tries informing relevant components in this case.
+ private final NonRunningStateEnteredCallback NON_RUNNING_STATE_ENTERED_CALLBACK = new NonRunningStateEnteredCallback();
+
+ private final StateMachineTez<AMContainerState, AMContainerEventType, AMContainerEvent, AMContainerImpl>
+ stateMachine;
- private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine;
private static final StateMachineFactory
<AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent>
stateMachineFactory =
@@ -328,7 +335,19 @@ public class AMContainerImpl implements AMContainer {
this.schedulerId = schedulerId;
this.launcherId = launcherId;
this.taskCommId = taskCommId;
- this.stateMachine = stateMachineFactory.make(this);
+ this.stateMachine = new StateMachineTez<>(stateMachineFactory.make(this), this);
+ augmentStateMachine();
+ }
+
+
+ private void augmentStateMachine() {
+ stateMachine
+ .registerStateEnteredCallback(AMContainerState.STOP_REQUESTED,
+ NON_RUNNING_STATE_ENTERED_CALLBACK)
+ .registerStateEnteredCallback(AMContainerState.STOPPING,
+ NON_RUNNING_STATE_ENTERED_CALLBACK)
+ .registerStateEnteredCallback(AMContainerState.COMPLETED,
+ NON_RUNNING_STATE_ENTERED_CALLBACK);
}
@Override
@@ -422,7 +441,7 @@ public class AMContainerImpl implements AMContainer {
LOG.error("Can't handle event " + event.getType()
+ " at current state " + oldState + " for ContainerId "
+ this.containerId, e);
- inError = true;
+ setError();
// TODO Can't set state to COMPLETED. Add a default error state.
}
if (oldState != getState()) {
@@ -482,7 +501,7 @@ public class AMContainerImpl implements AMContainer {
msg, e));
// We have not registered with any of the listeners etc yet. Send out a deallocateContainer
// message and return. The AM will shutdown shortly.
- container.inError = true;
+ container.setError();
container.deAllocate();
return;
}
@@ -515,7 +534,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
- container.inError = true;
+ container.setError();
container.registerFailedAttempt(event.getTaskAttemptId());
container.maybeSendNodeFailureForFailedAssignment(event
.getTaskAttemptId());
@@ -961,7 +980,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
- container.inError = true;
+ container.setError();
String errorMessage = "AttemptId: " + event.getTaskAttemptId() +
" cannot be allocated to container: " + container.getContainerId() +
" in " + container.getState() + " state";
@@ -1032,7 +1051,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
- container.inError = true;
+ container.setError();
}
}
@@ -1046,7 +1065,7 @@ public class AMContainerImpl implements AMContainer {
// think the container is still around and assign a task to it. The task
// ends up getting a CONTAINER_KILLED message. Task could handle this by
// asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it.
- container.inError = true;
+ container.setError();
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
String errorMessage = "AttemptId: " + event.getTaskAttemptId()
+ " cannot be allocated to container: " + container.getContainerId()
@@ -1058,9 +1077,19 @@ public class AMContainerImpl implements AMContainer {
}
}
+ private static class NonRunningStateEnteredCallback
+ implements OnStateChangedCallback<AMContainerState, AMContainerImpl> {
+
+ @Override
+ public void onStateChanged(AMContainerImpl amContainer,
+ AMContainerState amContainerState) {
+ amContainer.handleNonRunningStateEntered();
+ }
+ }
+
private void handleExtraTAAssign(
AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
- this.inError = true;
+ setError();
String errorMessage = "AMScheduler Error: Multiple simultaneous " +
"taskAttempt allocations to: " + this.getContainerId() +
". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
@@ -1078,6 +1107,19 @@ public class AMContainerImpl implements AMContainer {
this.unregisterFromContainerListener();
}
+ private void setError() {
+ this.inError = true;
+ handleNonRunningStateEntered();
+ }
+
+ private void handleNonRunningStateEntered() {
+ if (!completedMessageSent) {
+ completedMessageSent = true;
+ sendEvent(new AMNodeEventContainerCompleted(getContainer().getNodeId(),
+ schedulerId, containerId));
+ }
+ }
+
protected void registerFailedAttempt(TezTaskAttemptID taId) {
failedAssignments.add(taId);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
index 1c34816..bc01e04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
public interface AMNode extends EventHandler<AMNodeEvent> {
@@ -33,4 +34,6 @@ public interface AMNode extends EventHandler<AMNodeEvent> {
public boolean isUnhealthy();
public boolean isBlacklisted();
public boolean isUsable();
+
+ void dagComplete(DAG dag);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java
new file mode 100644
index 0000000..f999c3a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMNodeEventContainerCompleted extends AMNodeEvent {
+
+ private final ContainerId containerId;
+
+ public AMNodeEventContainerCompleted(
+ NodeId nodeId,
+ int schedulerId, ContainerId containerId) {
+ super(nodeId, schedulerId, AMNodeEventType.N_CONTAINER_COMPLETED);
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
index 86087d0..a141124 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
@@ -21,7 +21,10 @@ package org.apache.tez.dag.app.rm.node;
public enum AMNodeEventType {
//Producer: Scheduler
N_CONTAINER_ALLOCATED,
-
+
+ //Producer: Container
+ N_CONTAINER_COMPLETED,
+
//Producer: TaskSchedulerEventHandler
N_TA_SUCCEEDED,
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index bcc38c6..f4ad032 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -19,6 +19,8 @@
package org.apache.tez.dag.app.rm.node;
import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -26,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -60,20 +63,19 @@ public class AMNodeImpl implements AMNode {
private boolean blacklistingEnabled;
private boolean ignoreBlacklisting = false;
private boolean nodeUpdatesRescheduleEnabled;
- private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
+ private final Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
@VisibleForTesting
- final List<ContainerId> containers = new LinkedList<ContainerId>();
+ final Set<ContainerId> containers = new LinkedHashSet<>();
+ final Set<ContainerId> completedContainers = new HashSet<>();
int numFailedTAs = 0;
int numSuccessfulTAs = 0;
-
- //Book-keeping only. In case of Health status change.
- private final List<ContainerId> pastContainers = new LinkedList<ContainerId>();
-
+ private static final ContainerCompletedTransition CONTAINER_COMPLETED_TRANSITION =
+ new ContainerCompletedTransition();
private final StateMachine<AMNodeState, AMNodeEventType, AMNodeEvent> stateMachine;
@@ -103,6 +105,8 @@ public class AMNodeImpl implements AMNode {
new IgnoreBlacklistingStateChangeTransition(true))
.addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
AMNodeEventType.N_TURNED_HEALTHY)
+ .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE,
+ AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
// Transitions from BLACKLISTED state.
.addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
@@ -120,6 +124,8 @@ public class AMNodeImpl implements AMNode {
.addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE,
AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED,
new IgnoreBlacklistingStateChangeTransition(true))
+ .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED,
+ AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
.addTransition(
AMNodeState.BLACKLISTED,
AMNodeState.BLACKLISTED,
@@ -142,6 +148,8 @@ public class AMNodeImpl implements AMNode {
EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE),
AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED,
new IgnoreBlacklistingDisabledTransition())
+ .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE,
+ AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
.addTransition(
AMNodeState.FORCED_ACTIVE,
AMNodeState.FORCED_ACTIVE,
@@ -168,6 +176,8 @@ public class AMNodeImpl implements AMNode {
EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE),
AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition())
.addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
+ AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION)
+ .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY,
AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition())
.installTopology();
@@ -259,7 +269,6 @@ public class AMNodeImpl implements AMNode {
sendEvent(new AMContainerEventNodeFailed(c, "Node blacklisted"));
}
// these containers are not useful anymore
- pastContainers.addAll(containers);
containers.clear();
sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, schedulerId));
}
@@ -295,9 +304,9 @@ public class AMNodeImpl implements AMNode {
@Override
public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
- LOG.info("Attempt failed on node: " + node.getNodeId() + " TA: "
- + event.getTaskAttemptId() + " failed: " + event.failed()
- + " container: " + event.getContainerId() + " numFailedTAs: "
+ LOG.info("Attempt " + (event.failed() ? "failed" : "killed") + "on node: " + node.getNodeId()
+ + " TA: " + event.getTaskAttemptId()
+ + ", container: " + event.getContainerId() + ", numFailedTAs: "
+ node.numFailedTAs);
if (event.failed()) {
// ignore duplicate attempt ids
@@ -381,8 +390,6 @@ public class AMNodeImpl implements AMNode {
AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent;
node.sendEvent(new AMContainerEvent(event.getContainerId(),
AMContainerEventType.C_STOP_REQUEST));
- // ZZZ CReuse: Should the scheduler check node state before scheduling a
- // container on it ?
}
}
@@ -434,7 +441,6 @@ public class AMNodeImpl implements AMNode {
MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> {
@Override
public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
- node.pastContainers.addAll(node.containers);
node.containers.clear();
if (node.ignoreBlacklisting) {
return AMNodeState.FORCED_ACTIVE;
@@ -444,6 +450,17 @@ public class AMNodeImpl implements AMNode {
}
}
+ protected static class ContainerCompletedTransition
+ implements SingleArcTransition<AMNodeImpl, AMNodeEvent> {
+
+ @Override
+ public void transition(AMNodeImpl amNode, AMNodeEvent amNodeEvent) {
+ AMNodeEventContainerCompleted cc =
+ (AMNodeEventContainerCompleted) amNodeEvent;
+ amNode.completedContainers.add(cc.getContainerId());
+ }
+ }
+
@Override
public boolean isUnhealthy() {
this.readLock.lock();
@@ -468,4 +485,28 @@ public class AMNodeImpl implements AMNode {
public boolean isUsable() {
return !(isUnhealthy() || isBlacklisted());
}
+
+ @Override
+ public void dagComplete(DAG dag) {
+ this.writeLock.lock();
+ try {
+ int countBefore = containers.size();
+ int countCompleted = completedContainers.size();
+
+
+ // Actual functionality.
+ containers.removeAll(completedContainers);
+ completedContainers.clear();
+
+ int countAfter = containers.size();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Node {}, cleaning up knownContainers. current={}, completed={}, postCleanup={}",
+ getNodeId(), countBefore, countCompleted, countAfter);
+ }
+
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index fdc8a4c..1536170 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -101,6 +101,7 @@ public class AMNodeTracker extends AbstractService implements
// No synchronization required until there's multiple dispatchers.
switch (rEvent.getType()) {
case N_CONTAINER_ALLOCATED:
+ case N_CONTAINER_COMPLETED:
case N_TA_SUCCEEDED:
case N_TA_ENDED:
case N_IGNORE_BLACKLISTING_ENABLED:
@@ -140,7 +141,9 @@ public class AMNodeTracker extends AbstractService implements
}
public void dagComplete(DAG dag) {
- // TODO TEZ-2337 Maybe reset failures from previous DAGs
+ for (PerSourceNodeTracker perSourceNodeTracker : perSourceNodeTrackers.values()) {
+ perSourceNodeTracker.dagComplete(dag);
+ }
}
private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int schedulerId) {
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
index 72c3230..74c6176 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +107,8 @@ public class PerSourceNodeTracker {
}
break;
default:
- nodeMap.get(nodeId).handle(rEvent);
+ amNode = nodeMap.get(nodeId);
+ amNode.handle(rEvent);
}
}
@@ -186,6 +188,13 @@ public class PerSourceNodeTracker {
}
}
+ public void dagComplete(DAG dag) {
+ for (AMNode amNode : nodeMap.values()) {
+ amNode.dagComplete(dag);
+ }
+ // TODO TEZ-2337 Maybe reset failures from previous DAGs
+ }
+
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
this.eventHandler.handle(event);
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 4d1bbae..1b9df99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.app.TaskCommunicatorWrapper;
+import org.apache.tez.dag.app.rm.node.AMNodeEventType;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -146,7 +147,9 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
- wc.verifyNoOutgoingEvents();
+ List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
verify(wc.chh).unregister(wc.containerID);
@@ -196,7 +199,9 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
- wc.verifyNoOutgoingEvents();
+ List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
verify(wc.chh).unregister(wc.containerID);
@@ -266,7 +271,9 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
- wc.verifyNoOutgoingEvents();
+ List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
verify(wc.chh).unregister(wc.containerID);
@@ -288,9 +295,11 @@ public class TestAMContainer {
wc.stopRequest();
wc.verifyState(AMContainerState.STOP_REQUESTED);
// Event to NM to stop the container.
- wc.verifyCountAndGetOutgoingEvents(1);
- assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
- ContainerLauncherEventType.CONTAINER_STOP_REQUEST);
+
+ List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
wc.nmStopSent();
wc.verifyState(AMContainerState.STOPPING);
@@ -323,9 +332,10 @@ public class TestAMContainer {
wc.stopRequest();
wc.verifyState(AMContainerState.STOP_REQUESTED);
// Event to NM to stop the container.
- wc.verifyCountAndGetOutgoingEvents(1);
- assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
- ContainerLauncherEventType.CONTAINER_STOP_REQUEST);
+ List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
wc.nmStopFailed();
wc.verifyState(AMContainerState.STOPPING);
@@ -366,11 +376,12 @@ public class TestAMContainer {
"Multiple simultaneous taskAttempt");
verify(wc.chh).unregister(wc.containerID);
// 1 for NM stop request. 2 TERMINATING to TaskAttempt.
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
- TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
@@ -405,11 +416,12 @@ public class TestAMContainer {
"Multiple simultaneous taskAttempt");
verify(wc.chh).unregister(wc.containerID);
// 1 for NM stop request. 2 TERMINATING to TaskAttempt.
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
- TaskAttemptEventType.TA_CONTAINER_TERMINATING);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
@@ -442,10 +454,11 @@ public class TestAMContainer {
"timed out");
verify(wc.chh).unregister(wc.containerID);
// 1 to TA, 1 for RM de-allocate.
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
- ContainerLauncherEventType.CONTAINER_STOP_REQUEST);
+ ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
// TODO Should this be an RM DE-ALLOCATE instead ?
wc.containerCompleted();
@@ -477,10 +490,11 @@ public class TestAMContainer {
"received a STOP_REQUEST");
verify(wc.chh).unregister(wc.containerID);
// 1 to TA, 1 for RM de-allocate.
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
- ContainerLauncherEventType.CONTAINER_STOP_REQUEST);
+ ContainerLauncherEventType.CONTAINER_STOP_REQUEST,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
// TODO Should this be an RM DE-ALLOCATE instead ?
wc.containerCompleted();
@@ -511,10 +525,11 @@ public class TestAMContainer {
verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED,
"launchFailed");
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
- AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
for (Event e : outgoingEvents) {
if (e.getType() == TaskAttemptEventType.TA_CONTAINER_TERMINATING) {
Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
@@ -538,7 +553,9 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
- wc.verifyNoOutgoingEvents();
+ List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
assertFalse(wc.amContainer.isInErrorState());
}
@@ -561,9 +578,10 @@ public class TestAMContainer {
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
@@ -591,9 +609,10 @@ public class TestAMContainer {
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "DiskFailed");
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
@@ -623,9 +642,10 @@ public class TestAMContainer {
verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.NODE_FAILED,
"NodeFailed");
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED,
((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
@@ -656,11 +676,12 @@ public class TestAMContainer {
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
- wc.verifyCountAndGetOutgoingEvents(0);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
assertFalse(wc.amContainer.isInErrorState());
- wc.verifyNoOutgoingEvents();
wc.verifyHistoryStopEvent();
assertFalse(wc.amContainer.isInErrorState());
@@ -685,9 +706,10 @@ public class TestAMContainer {
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
assertFalse(wc.amContainer.isInErrorState());
@@ -722,11 +744,14 @@ public class TestAMContainer {
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+
+ Event event = findEventByType(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
- ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
+ ((TaskAttemptEventContainerTerminatedBySystem)event).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
assertFalse(wc.amContainer.isInErrorState());
@@ -761,9 +786,10 @@ public class TestAMContainer {
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
@@ -799,11 +825,13 @@ public class TestAMContainer {
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ Event event = findEventByType(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
- ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
+ ((TaskAttemptEventContainerTerminatedBySystem)event).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
assertFalse(wc.amContainer.isInErrorState());
@@ -862,11 +890,12 @@ public class TestAMContainer {
wc.nodeFailed();
// Expecting a complete event from the RM
wc.verifyState(AMContainerState.STOPPING);
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
- AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
@@ -904,11 +933,12 @@ public class TestAMContainer {
wc.nodeFailed();
// Expecting a complete event from the RM
wc.verifyState(AMContainerState.STOPPING);
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_NODE_FAILED,
- AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
@@ -945,12 +975,13 @@ public class TestAMContainer {
wc.nodeFailed();
// Expecting a complete event from the RM
wc.verifyState(AMContainerState.STOPPING);
- outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4);
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(5);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_NODE_FAILED,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
- AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+ AMSchedulerEventType.S_CONTAINER_DEALLOCATE,
+ AMNodeEventType.N_CONTAINER_COMPLETED);
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
@@ -1439,6 +1470,15 @@ public class TestAMContainer {
assertTrue("Found unexpected events: " + eventsCopy
+ " in outgoing event list", eventsCopy.isEmpty());
}
+
+ private Event findEventByType(List<Event> events, Enum<?> type) {
+ for (Event event : events) {
+ if (event.getType() == type) {
+ return event;
+ }
+ }
+ return null;
+ }
private LocalResource createLocalResource(String name) {
LocalResource lr = LocalResource.newInstance(URL.newInstance(null, "localhost", 2321, name),
http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
index e123dd1..11d3b7a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.mock;
import java.util.List;
+import org.apache.tez.dag.app.dag.DAG;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -326,6 +327,78 @@ public class TestAMNodeTracker {
}
}
+ @Test(timeout = 10000L)
+ public void testNodeCompletedAndCleanup() {
+ AppContext appContext = mock(AppContext.class);
+ Configuration conf = new Configuration(false);
+ conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+ TestEventHandler handler = new TestEventHandler();
+ AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ AMContainerMap amContainerMap = mock(AMContainerMap.class);
+ TaskSchedulerManager taskSchedulerManager =
+ mock(TaskSchedulerManager.class);
+ dispatcher.register(AMNodeEventType.class, amNodeTracker);
+ dispatcher.register(AMContainerEventType.class, amContainerMap);
+ dispatcher.register(AMSchedulerEventType.class, taskSchedulerManager);
+ amNodeTracker.init(conf);
+ amNodeTracker.start();
+
+ try {
+
+ NodeId nodeId = NodeId.newInstance("fakenode", 3333);
+ amNodeTracker.nodeSeen(nodeId, 0);
+
+ AMNode amNode = amNodeTracker.get(nodeId, 0);
+ ContainerId[] containerIds = new ContainerId[7];
+
+ // Start 5 containers.
+ for (int i = 0; i < 5; i++) {
+ containerIds[i] = mock(ContainerId.class);
+ amNodeTracker
+ .handle(new AMNodeEventContainerAllocated(nodeId, 0, containerIds[i]));
+ }
+ assertEquals(5, amNode.getContainers().size());
+
+ // Finnish 1st dag
+ amNodeTracker.dagComplete(mock(DAG.class));
+ assertEquals(5, amNode.getContainers().size());
+
+
+ // Mark 2 as complete. Finish 2nd dag.
+ for (int i = 0; i < 2; i++) {
+ amNodeTracker.handle(
+ new AMNodeEventContainerCompleted(nodeId, 0, containerIds[i]));
+ }
+ amNodeTracker.dagComplete(mock(DAG.class));
+ assertEquals(3, amNode.getContainers().size());
+
+ // Add 2 more containers. Mark all as complete. Finish 3rd dag.
+ for (int i = 5; i < 7; i++) {
+ containerIds[i] = mock(ContainerId.class);
+ amNodeTracker
+ .handle(new AMNodeEventContainerAllocated(nodeId, 0, containerIds[i]));
+ }
+ assertEquals(5, amNode.getContainers().size());
+ amNodeTracker.dagComplete(mock(DAG.class));
+ assertEquals(5, amNode.getContainers().size());
+ amNodeTracker.dagComplete(mock(DAG.class));
+ assertEquals(5, amNode.getContainers().size());
+
+ for (int i = 2; i < 7; i++) {
+ amNodeTracker.handle(
+ new AMNodeEventContainerCompleted(nodeId, 0, containerIds[i]));
+ }
+ assertEquals(5, amNode.getContainers().size());
+ amNodeTracker.dagComplete(mock(DAG.class));
+ assertEquals(0, amNode.getContainers().size());
+
+ } finally {
+ amNodeTracker.stop();
+ }
+
+ }
+
@Test(timeout=10000)
public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception {
_testNodeUnhealthyRescheduleTasks(true);