You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/25 23:13:53 UTC
hadoop git commit: YARN-7240. Add more states and transitions to
stabilize the NM Container state machine. (Kartheek Muthyala via asuresh)
Repository: hadoop
Updated Branches:
refs/heads/trunk 47011d7dd -> df800f6cf
YARN-7240. Add more states and transitions to stabilize the NM Container state machine. (Kartheek Muthyala via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/df800f6c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/df800f6c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/df800f6c
Branch: refs/heads/trunk
Commit: df800f6cf3ea663daf4081ebe784808b08d9366d
Parents: 47011d7
Author: Arun Suresh <as...@apache.org>
Authored: Mon Sep 25 14:11:55 2017 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Mon Sep 25 16:02:55 2017 -0700
----------------------------------------------------------------------
.../containermanager/ContainerManagerImpl.java | 41 +---
.../container/ContainerEventType.java | 6 +-
.../container/ContainerImpl.java | 174 ++++++++++++--
.../container/ContainerState.java | 3 +-
.../container/UpdateContainerTokenEvent.java | 86 +++++++
.../scheduler/ContainerScheduler.java | 114 ++++-----
.../UpdateContainerSchedulerEvent.java | 46 ++--
.../BaseContainerManagerTest.java | 2 +
.../containermanager/TestContainerManager.java | 229 ++++++++++++++++++-
.../TestContainerSchedulerQueuing.java | 101 ++++++++
10 files changed, 660 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index e497f62..d12892e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -144,7 +145,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@@ -1251,29 +1251,6 @@ public class ContainerManagerImpl extends CompositeService implements
+ " [" + containerTokenIdentifier.getVersion() + "]");
}
- // Check container state
- org.apache.hadoop.yarn.server.nodemanager.
- containermanager.container.ContainerState currentState =
- container.getContainerState();
- EnumSet<org.apache.hadoop.yarn.server.nodemanager.containermanager
- .container.ContainerState> allowedStates = EnumSet.of(
- org.apache.hadoop.yarn.server.nodemanager.containermanager.container
- .ContainerState.RUNNING,
- org.apache.hadoop.yarn.server.nodemanager.containermanager.container
- .ContainerState.SCHEDULED,
- org.apache.hadoop.yarn.server.nodemanager.containermanager.container
- .ContainerState.LOCALIZING,
- org.apache.hadoop.yarn.server.nodemanager.containermanager.container
- .ContainerState.REINITIALIZING,
- org.apache.hadoop.yarn.server.nodemanager.containermanager.container
- .ContainerState.RELAUNCHING);
- if (!allowedStates.contains(currentState)) {
- throw RPCUtil.getRemoteException("Container " + containerId.toString()
- + " is in " + currentState.name() + " state."
- + " Resource can only be changed when a container is in"
- + " RUNNING or SCHEDULED state");
- }
-
// Check validity of the target resource.
Resource currentResource = container.getResource();
ExecutionType currentExecType =
@@ -1313,11 +1290,11 @@ public class ContainerManagerImpl extends CompositeService implements
this.readLock.lock();
try {
if (!serviceStopped) {
- // Dispatch message to ContainerScheduler to actually
+ // Dispatch message to Container to actually
// make the change.
- dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent(
- container, containerTokenIdentifier, isResourceChange,
- isExecTypeUpdate, isIncrease));
+ dispatcher.getEventHandler().handle(new UpdateContainerTokenEvent(
+ container.getContainerId(), containerTokenIdentifier,
+ isResourceChange, isExecTypeUpdate, isIncrease));
} else {
throw new YarnException(
"Unable to change container resource as the NodeManager is "
@@ -1816,10 +1793,14 @@ public class ContainerManagerImpl extends CompositeService implements
if (container == null) {
throw new YarnException("Specified " + containerId + " does not exist!");
}
- if (!container.isRunning() || container.isReInitializing()) {
+ if (!container.isRunning() || container.isReInitializing()
+ || container.getContainerTokenIdentifier().getExecutionType()
+ == ExecutionType.OPPORTUNISTIC) {
throw new YarnException("Cannot perform " + op + " on [" + containerId
+ "]. Current state is [" + container.getContainerState() + ", " +
- "isReInitializing=" + container.isReInitializing() + "].");
+ "isReInitializing=" + container.isReInitializing() + "]. Container"
+ + " Execution Type is [" + container.getContainerTokenIdentifier()
+ .getExecutionType() + "].");
}
return container;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 1475435..e28b37d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -29,6 +29,7 @@ public enum ContainerEventType {
ROLLBACK_REINIT,
PAUSE_CONTAINER,
RESUME_CONTAINER,
+ UPDATE_CONTAINER_TOKEN,
// DownloadManager
CONTAINER_INITED,
@@ -42,5 +43,8 @@ public enum ContainerEventType {
CONTAINER_EXITED_WITH_FAILURE,
CONTAINER_KILLED_ON_REQUEST,
CONTAINER_PAUSED,
- CONTAINER_RESUMED
+ CONTAINER_RESUMED,
+
+ // Producer: ContainerScheduler
+ CONTAINER_TOKEN_UPDATED
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 836e70e..705087b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -33,6 +33,8 @@ import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -308,8 +310,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
- .addTransition(ContainerState.NEW, ContainerState.DONE,
- ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
+ .addTransition(ContainerState.NEW, ContainerState.NEW,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
// From LOCALIZING State
.addTransition(ContainerState.LOCALIZING,
@@ -325,8 +327,9 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition())
- .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
- ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
+ .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
+
// From LOCALIZATION_FAILED State
.addTransition(ContainerState.LOCALIZATION_FAILED,
@@ -351,6 +354,9 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_FAILED)
+ .addTransition(ContainerState.LOCALIZATION_FAILED,
+ ContainerState.LOCALIZATION_FAILED,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
// From SCHEDULED State
.addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
@@ -364,6 +370,9 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition())
+ .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
// From RUNNING State
.addTransition(ContainerState.RUNNING,
@@ -376,10 +385,16 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
- .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
+ .addTransition(ContainerState.RUNNING,
+ EnumSet.of(ContainerState.RUNNING,
+ ContainerState.REINITIALIZING,
+ ContainerState.REINITIALIZING_AWAITING_KILL),
ContainerEventType.REINITIALIZE_CONTAINER,
new ReInitializeContainerTransition())
- .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
+ .addTransition(ContainerState.RUNNING,
+ EnumSet.of(ContainerState.RUNNING,
+ ContainerState.REINITIALIZING,
+ ContainerState.REINITIALIZING_AWAITING_KILL),
ContainerEventType.ROLLBACK_REINIT,
new RollbackContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
@@ -398,9 +413,16 @@ public class ContainerImpl implements Container {
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
.addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
- ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
+ ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
+
// From PAUSING State
+ .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ new ResourceLocalizedWhileRunningTransition())
.addTransition(ContainerState.PAUSING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
@@ -420,6 +442,12 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
+ .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ new ResourceLocalizedWhileRunningTransition())
+ .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
// From PAUSED State
.addTransition(ContainerState.PAUSED, ContainerState.KILLING,
@@ -429,6 +457,10 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
ContainerEventType.PAUSE_CONTAINER)
+ // This can happen during re-initialization.
+ .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ new ResourceLocalizedWhileRunningTransition())
.addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
// In case something goes wrong then container will exit from the
@@ -444,6 +476,9 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
+ .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
// From RESUMING State
.addTransition(ContainerState.RESUMING, ContainerState.KILLING,
@@ -453,6 +488,10 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
+ // This can happen during re-initialization
+ .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ new ResourceLocalizedWhileRunningTransition())
// In case something goes wrong then container will exit from the
// RESUMING state
.addTransition(ContainerState.RESUMING,
@@ -467,6 +506,10 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
+ .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
+ // NOTE - We cannot get a PAUSE_CONTAINER while in RESUMING state.
// From REINITIALIZING State
.addTransition(ContainerState.REINITIALIZING,
@@ -478,7 +521,8 @@ public class ContainerImpl implements Container {
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.REINITIALIZING,
- ContainerState.REINITIALIZING,
+ EnumSet.of(ContainerState.REINITIALIZING,
+ ContainerState.REINITIALIZING_AWAITING_KILL),
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileReInitTransition())
.addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
@@ -490,12 +534,39 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
- .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
- ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
+ .addTransition(ContainerState.REINITIALIZING, ContainerState.PAUSING,
+ ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
.addTransition(ContainerState.REINITIALIZING,
+ ContainerState.REINITIALIZING,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
+
+ // from REINITIALIZING_AWAITING_KILL
+ .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition(true))
+ .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+ ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+ ContainerState.SCHEDULED, ContainerEventType.PAUSE_CONTAINER)
+ .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerState.SCHEDULED,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledForReInitializationTransition())
+ .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
// From RELAUNCHING State
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
@@ -511,6 +582,10 @@ public class ContainerImpl implements Container {
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
+ .addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ new NotifyContainerSchedulerOfUpdateTransition())
+
// From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@@ -524,6 +599,10 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_SUCCESS,
EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.PAUSE_CONTAINER))
+ // No transition - assuming container is on its way to completion
+ .addTransition(ContainerState.EXITED_WITH_SUCCESS,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN)
// From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@@ -537,6 +616,10 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_FAILURE,
EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.PAUSE_CONTAINER))
+ // No transition - assuming container is on its way to completion
+ .addTransition(ContainerState.EXITED_WITH_FAILURE,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN)
// From KILLING State.
.addTransition(ContainerState.KILLING,
@@ -572,6 +655,9 @@ public class ContainerImpl implements Container {
ContainerState.KILLING,
EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.PAUSE_CONTAINER))
+ // No transition - assuming container is on its way to completion
+ .addTransition(ContainerState.KILLING, ContainerState.KILLING,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN)
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -589,6 +675,10 @@ public class ContainerImpl implements Container {
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
ContainerEventType.PAUSE_CONTAINER))
+ // No transition - assuming container is on its way to completion
+ .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN)
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -606,6 +696,9 @@ public class ContainerImpl implements Container {
EnumSet.of(ContainerEventType.RESOURCE_FAILED,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
+ // No transition - assuming container is on its way to completion
+ .addTransition(ContainerState.DONE, ContainerState.DONE,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN)
// create the topology tables
.installTopology();
@@ -626,6 +719,7 @@ public class ContainerImpl implements Container {
case RUNNING:
case RELAUNCHING:
case REINITIALIZING:
+ case REINITIALIZING_AWAITING_KILL:
case EXITED_WITH_SUCCESS:
case EXITED_WITH_FAILURE:
case KILLING:
@@ -929,6 +1023,45 @@ public class ContainerImpl implements Container {
}
+ static class UpdateTransition extends ContainerTransition {
+ @Override
+ public void transition(
+ ContainerImpl container, ContainerEvent event) {
+ UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
+ // Update the container token
+ container.setContainerTokenIdentifier(updateEvent.getUpdatedToken());
+ if (updateEvent.isResourceChange()) {
+ try {
+ // Persist change in the state store.
+ container.context.getNMStateStore().storeContainerResourceChanged(
+ container.containerId,
+ container.getContainerTokenIdentifier().getVersion(),
+ container.getResource());
+ } catch (IOException e) {
+ LOG.warn("Could not store container [" + container.containerId
+ + "] resource change..", e);
+ }
+ }
+ }
+ }
+
+ static class NotifyContainerSchedulerOfUpdateTransition extends
+ UpdateTransition {
+ @Override
+ public void transition(
+ ContainerImpl container, ContainerEvent event) {
+
+ UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
+ // Save original token
+ ContainerTokenIdentifier originalToken =
+ container.containerTokenIdentifier;
+ super.transition(container, updateEvent);
+ container.dispatcher.getEventHandler().handle(
+ new UpdateContainerSchedulerEvent(container,
+ originalToken, updateEvent));
+ }
+ }
+
/**
* State transition when a NEW container receives the INIT_CONTAINER
* message.
@@ -1074,12 +1207,15 @@ public class ContainerImpl implements Container {
/**
* Transition to start the Re-Initialization process.
*/
- static class ReInitializeContainerTransition extends ContainerTransition {
+ static class ReInitializeContainerTransition implements
+ MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {
@SuppressWarnings("unchecked")
@Override
- public void transition(ContainerImpl container, ContainerEvent event) {
+ public ContainerState transition(
+ ContainerImpl container, ContainerEvent event) {
container.reInitContext = createReInitContext(container, event);
+ boolean resourcesPresent = false;
try {
// 'reInitContext.newResourceSet' can be
// a) current container resourceSet (In case of Restart)
@@ -1101,6 +1237,7 @@ public class ContainerImpl implements Container {
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
+ resourcesPresent = true;
}
container.metrics.reInitingContainer();
NMAuditLogger.logSuccess(container.user,
@@ -1112,7 +1249,11 @@ public class ContainerImpl implements Container {
" re-initialization failure..", e);
container.addDiagnostics("Error re-initializing due to" +
"[" + e.getMessage() + "]");
+ return ContainerState.RUNNING;
}
+ return resourcesPresent ?
+ ContainerState.REINITIALIZING_AWAITING_KILL :
+ ContainerState.REINITIALIZING;
}
protected ReInitializationContext createReInitContext(
@@ -1164,11 +1305,14 @@ public class ContainerImpl implements Container {
* If all dependencies are met, then restart Container with new bits.
*/
static class ResourceLocalizedWhileReInitTransition
- extends ContainerTransition {
+ implements MultipleArcTransition
+ <ContainerImpl, ContainerEvent, ContainerState> {
+
@SuppressWarnings("unchecked")
@Override
- public void transition(ContainerImpl container, ContainerEvent event) {
+ public ContainerState transition(
+ ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
container.reInitContext.newResourceSet.resourceLocalized(
@@ -1180,7 +1324,9 @@ public class ContainerImpl implements Container {
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
+ return ContainerState.REINITIALIZING_AWAITING_KILL;
}
+ return ContainerState.REINITIALIZING;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 7c3fea8..5644d03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -20,7 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
- REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
+ REINITIALIZING, REINITIALIZING_AWAITING_KILL,
+ EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
PAUSING, PAUSED, RESUMING
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java
new file mode 100644
index 0000000..c9dc97e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+
+/**
+ * Update Event consumed by the Container.
+ */
+public class UpdateContainerTokenEvent extends ContainerEvent {
+ private final ContainerTokenIdentifier updatedToken;
+ private final boolean isResourceChange;
+ private final boolean isExecTypeUpdate;
+ private final boolean isIncrease;
+
+ /**
+ * Create Update event.
+ *
+ * @param cID Container Id.
+ * @param updatedToken Updated Container Token.
+ * @param isResourceChange Is Resource change.
+ * @param isExecTypeUpdate Is ExecutionType Update.
+ * @param isIncrease Is container increase.
+ */
+ public UpdateContainerTokenEvent(ContainerId cID,
+ ContainerTokenIdentifier updatedToken, boolean isResourceChange,
+ boolean isExecTypeUpdate, boolean isIncrease) {
+ super(cID, ContainerEventType.UPDATE_CONTAINER_TOKEN);
+ this.updatedToken = updatedToken;
+ this.isResourceChange = isResourceChange;
+ this.isExecTypeUpdate = isExecTypeUpdate;
+ this.isIncrease = isIncrease;
+ }
+
+ /**
+ * Update Container Token.
+ *
+ * @return Container Token.
+ */
+ public ContainerTokenIdentifier getUpdatedToken() {
+ return updatedToken;
+ }
+
+ /**
+ * Is this update a ResourceChange.
+ *
+ * @return isResourceChange.
+ */
+ public boolean isResourceChange() {
+ return isResourceChange;
+ }
+
+ /**
+ * Is this update an ExecType Update.
+ *
+ * @return isExecTypeUpdate.
+ */
+ public boolean isExecTypeUpdate() {
+ return isExecTypeUpdate;
+ }
+
+ /**
+ * Is this a container Increase.
+ *
+ * @return isIncrease.
+ */
+ public boolean isIncrease() {
+ return isIncrease;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 830a06d..e436822 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -151,7 +152,9 @@ public class ContainerScheduler extends AbstractService implements
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
+ // NOTE: Is sent only after container state has changed to PAUSED...
case CONTAINER_PAUSED:
+ // NOTE: Is sent only after container state has changed to DONE...
case CONTAINER_COMPLETED:
onResourcesReclaimed(event.getContainer());
break;
@@ -180,58 +183,38 @@ public class ContainerScheduler extends AbstractService implements
if (updateEvent.isResourceChange()) {
if (runningContainers.containsKey(containerId)) {
this.utilizationTracker.subtractContainerResource(
- updateEvent.getContainer());
- updateEvent.getContainer().setContainerTokenIdentifier(
- updateEvent.getUpdatedToken());
+ new ContainerImpl(getConfig(), null, null, null, null,
+ updateEvent.getOriginalToken(), context));
this.utilizationTracker.addContainerResources(
updateEvent.getContainer());
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(containerId,
updateEvent.getUpdatedToken().getResource()));
- } else {
- // Is Queued or localizing..
- updateEvent.getContainer().setContainerTokenIdentifier(
- updateEvent.getUpdatedToken());
- }
- try {
- // Persist change in the state store.
- this.context.getNMStateStore().storeContainerResourceChanged(
- containerId,
- updateEvent.getUpdatedToken().getVersion(),
- updateEvent.getUpdatedToken().getResource());
- } catch (IOException e) {
- LOG.warn("Could not store container [" + containerId + "] resource " +
- "change..", e);
}
}
if (updateEvent.isExecTypeUpdate()) {
- updateEvent.getContainer().setContainerTokenIdentifier(
- updateEvent.getUpdatedToken());
- // If this is a running container.. just change the execution type
- // and be done with it.
- if (!runningContainers.containsKey(containerId)) {
- // Promotion or not (Increase signifies either a promotion
- // or container size increase)
- if (updateEvent.isIncrease()) {
- // Promotion of queued container..
- if (queuedOpportunisticContainers.remove(containerId) != null) {
- queuedGuaranteedContainers.put(containerId,
- updateEvent.getContainer());
- }
+ // Promotion or not (Increase signifies either a promotion
+ // or container size increase)
+ if (updateEvent.isIncrease()) {
+ // Promotion of queued container..
+ if (queuedOpportunisticContainers.remove(containerId) != null) {
+ queuedGuaranteedContainers.put(containerId,
+ updateEvent.getContainer());
//Kill/pause opportunistic containers if any to make room for
// promotion request
reclaimOpportunisticContainerResources(updateEvent.getContainer());
- } else {
- // Demotion of queued container.. Should not happen too often
- // since you should not find too many queued guaranteed
- // containers
- if (queuedGuaranteedContainers.remove(containerId) != null) {
- queuedOpportunisticContainers.put(containerId,
- updateEvent.getContainer());
- }
+ }
+ } else {
+ // Demotion of queued container.. Should not happen too often
+ // since you should not find too many queued guaranteed
+ // containers
+ if (queuedGuaranteedContainers.remove(containerId) != null) {
+ queuedOpportunisticContainers.put(containerId,
+ updateEvent.getContainer());
}
}
+ startPendingContainers(maxOppQueueLength <= 0);
}
}
@@ -290,6 +273,16 @@ public class ContainerScheduler extends AbstractService implements
queuedGuaranteedContainers.remove(container.getContainerId());
}
+ // Requeue PAUSED containers
+ if (container.getContainerState() == ContainerState.PAUSED) {
+ if (container.getContainerTokenIdentifier().getExecutionType() ==
+ ExecutionType.GUARANTEED) {
+ queuedGuaranteedContainers.put(container.getContainerId(), container);
+ } else {
+ queuedOpportunisticContainers.put(
+ container.getContainerId(), container);
+ }
+ }
// decrement only if it was a running container
Container completedContainer = runningContainers.remove(container
.getContainerId());
@@ -301,7 +294,8 @@ public class ContainerScheduler extends AbstractService implements
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
- startPendingContainers(false);
+ boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+ startPendingContainers(forceStartGuaranteedContainers);
}
}
@@ -311,26 +305,9 @@ public class ContainerScheduler extends AbstractService implements
* container without looking at available resource
*/
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
- // Start pending guaranteed containers, if resources available.
+ // Start guaranteed containers that are paused, if resources available.
boolean resourcesAvailable = startContainers(
- queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
- // Resume opportunistic containers, if resource available.
- if (resourcesAvailable) {
- List<Container> pausedContainers = new ArrayList<Container>();
- Map<ContainerId, Container> containers =
- context.getContainers();
- for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
- ContainerId contId = entry.getKey();
- // Find containers that were not already started and are in paused state
- if(false == runningContainers.containsKey(contId)) {
- if(containers.get(contId).getContainerState()
- == ContainerState.PAUSED) {
- pausedContainers.add(containers.get(contId));
- }
- }
- }
- resourcesAvailable = startContainers(pausedContainers, false);
- }
+ queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
@@ -590,16 +567,19 @@ public class ContainerScheduler extends AbstractService implements
queuedOpportunisticContainers.values().iterator();
while (containerIter.hasNext()) {
Container container = containerIter.next();
- if (numAllowed <= 0) {
- container.sendKillEvent(
- ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
- "Container De-queued to meet NM queuing limits.");
- containerIter.remove();
- LOG.info(
- "Opportunistic container {} will be killed to meet NM queuing" +
- " limits.", container.getContainerId());
+ // Do not shed PAUSED containers
+ if (container.getContainerState() != ContainerState.PAUSED) {
+ if (numAllowed <= 0) {
+ container.sendKillEvent(
+ ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+ "Container De-queued to meet NM queuing limits.");
+ containerIter.remove();
+ LOG.info(
+ "Opportunistic container {} will be killed to meet NM queuing" +
+ " limits.", container.getContainerId());
+ }
+ numAllowed--;
}
- numAllowed--;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
index 5384b7e..2473982 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
@@ -21,33 +21,37 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
+
/**
* Update Event consumed by the {@link ContainerScheduler}.
*/
public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
- private ContainerTokenIdentifier updatedToken;
- private boolean isResourceChange;
- private boolean isExecTypeUpdate;
- private boolean isIncrease;
+ private final UpdateContainerTokenEvent containerEvent;
+ private final ContainerTokenIdentifier originalToken;
/**
* Create instance of Event.
*
- * @param originalContainer Original Container.
- * @param updatedToken Updated Container Token.
- * @param isResourceChange is this a Resource Change.
- * @param isExecTypeUpdate is this an ExecTypeUpdate.
- * @param isIncrease is this a Container Increase.
+ * @param container Container.
+ * @param origToken The Original Container Token.
+ * @param event The Container Event.
+ */
+ public UpdateContainerSchedulerEvent(Container container,
+ ContainerTokenIdentifier origToken, UpdateContainerTokenEvent event) {
+ super(container, ContainerSchedulerEventType.UPDATE_CONTAINER);
+ this.containerEvent = event;
+ this.originalToken = origToken;
+ }
+
+ /**
+ * Original Token before update.
+ *
+ * @return Container Token.
*/
- public UpdateContainerSchedulerEvent(Container originalContainer,
- ContainerTokenIdentifier updatedToken, boolean isResourceChange,
- boolean isExecTypeUpdate, boolean isIncrease) {
- super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER);
- this.updatedToken = updatedToken;
- this.isResourceChange = isResourceChange;
- this.isExecTypeUpdate = isExecTypeUpdate;
- this.isIncrease = isIncrease;
+ public ContainerTokenIdentifier getOriginalToken() {
+ return this.originalToken;
}
/**
@@ -56,7 +60,7 @@ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
* @return Container Token.
*/
public ContainerTokenIdentifier getUpdatedToken() {
- return updatedToken;
+ return containerEvent.getUpdatedToken();
}
/**
@@ -64,7 +68,7 @@ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
* @return isResourceChange.
*/
public boolean isResourceChange() {
- return isResourceChange;
+ return containerEvent.isResourceChange();
}
/**
@@ -72,7 +76,7 @@ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
* @return isExecTypeUpdate.
*/
public boolean isExecTypeUpdate() {
- return isExecTypeUpdate;
+ return containerEvent.isExecTypeUpdate();
}
/**
@@ -80,6 +84,6 @@ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
* @return isIncrease.
*/
public boolean isIncrease() {
- return isIncrease;
+ return containerEvent.isIncrease();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 3cafcbd..fc9e6c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -208,6 +208,8 @@ public abstract class BaseContainerManagerTest {
containerManager.init(conf);
nodeStatusUpdater.start();
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
+ ((NMContext)context).setContainerStateTransitionListener(
+ new NodeManager.DefaultContainerStateListener());
}
protected ContainerManagerImpl
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 38df208..6e8c005 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -90,12 +90,16 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@@ -119,6 +123,41 @@ public class TestContainerManager extends BaseContainerManagerTest {
LOG = LoggerFactory.getLogger(TestContainerManager.class);
}
+ private static class Listener implements ContainerStateTransitionListener {
+
+ private final Map<ContainerId,
+ List<org.apache.hadoop.yarn.server.nodemanager.containermanager.
+ container.ContainerState>> states = new HashMap<>();
+ private final Map<ContainerId, List<ContainerEventType>> events =
+ new HashMap<>();
+
+ @Override
+ public void init(Context context) {}
+
+ @Override
+ public void preTransition(ContainerImpl op,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState beforeState,
+ ContainerEvent eventToBeProcessed) {
+ if (!states.containsKey(op.getContainerId())) {
+ states.put(op.getContainerId(), new ArrayList<>());
+ states.get(op.getContainerId()).add(beforeState);
+ events.put(op.getContainerId(), new ArrayList<>());
+ }
+ }
+
+ @Override
+ public void postTransition(ContainerImpl op,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState beforeState,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState afterState,
+ ContainerEvent processedEvent) {
+ states.get(op.getContainerId()).add(afterState);
+ events.get(op.getContainerId()).add(processedEvent.getType());
+ }
+ }
+
private boolean delayContainers = false;
@Override
@@ -144,7 +183,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
@Override
protected ContainerManagerImpl
createContainerManager(DeletionService delSrvc) {
- return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+ return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, dirsHandler) {
@Override
@@ -496,6 +535,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
@Test
public void testContainerUpgradeSuccessAutoCommit() throws IOException,
InterruptedException, YarnException {
+ Listener listener = new Listener();
+ ((NodeManager.DefaultContainerStateListener)containerManager.context.
+ getContainerStateTransitionListener()).addListener(listener);
testContainerReInitSuccess(true);
// Should not be able to Commit (since already auto committed)
try {
@@ -504,6 +546,41 @@ public class TestContainerManager extends BaseContainerManagerTest {
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
}
+
+ List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState> containerStates =
+ listener.states.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.NEW,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.LOCALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING), containerStates);
+
+ List<ContainerEventType> containerEventTypes =
+ listener.events.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ ContainerEventType.INIT_CONTAINER,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.REINITIALIZE_CONTAINER,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
}
@Test
@@ -524,6 +601,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
@Test
public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
InterruptedException, YarnException {
+ Listener listener = new Listener();
+ ((NodeManager.DefaultContainerStateListener)containerManager.context.
+ getContainerStateTransitionListener()).addListener(listener);
String[] pids = testContainerReInitSuccess(false);
// Test that the container can be Restarted after the successful upgrrade.
@@ -575,6 +655,67 @@ public class TestContainerManager extends BaseContainerManagerTest {
Assert.assertNotEquals("The Rolled-back process should be a different pid",
pids[0], rolledBackPid);
+
+ List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState> containerStates =
+ listener.states.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.NEW,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.LOCALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ // This is the successful restart
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ // This is the rollback
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING), containerStates);
+
+ List<ContainerEventType> containerEventTypes =
+ listener.events.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ ContainerEventType.INIT_CONTAINER,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.REINITIALIZE_CONTAINER,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.REINITIALIZE_CONTAINER,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.ROLLBACK_REINIT,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
}
@Test
@@ -584,6 +725,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
return;
}
containerManager.start();
+ Listener listener = new Listener();
+ ((NodeManager.DefaultContainerStateListener)containerManager.context.
+ getContainerStateTransitionListener()).addListener(listener);
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
@@ -598,6 +742,32 @@ public class TestContainerManager extends BaseContainerManagerTest {
// since upgrade was terminated..
Assert.assertTrue("Process is NOT alive!",
DefaultContainerExecutor.containerIsAlive(pid));
+
+ List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState> containerStates =
+ listener.states.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.NEW,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.LOCALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING), containerStates);
+
+ List<ContainerEventType> containerEventTypes =
+ listener.events.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ ContainerEventType.INIT_CONTAINER,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.REINITIALIZE_CONTAINER,
+ ContainerEventType.RESOURCE_FAILED), containerEventTypes);
}
@Test
@@ -632,6 +802,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
return;
}
containerManager.start();
+ Listener listener = new Listener();
+ ((NodeManager.DefaultContainerStateListener)containerManager.context.
+ getContainerStateTransitionListener()).addListener(listener);
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
@@ -666,6 +839,50 @@ public class TestContainerManager extends BaseContainerManagerTest {
Assert.assertNotEquals("The Rolled-back process should be a different pid",
pid, rolledBackPid);
+
+ List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState> containerStates =
+ listener.states.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.NEW,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.LOCALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.REINITIALIZING_AWAITING_KILL,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING), containerStates);
+
+ List<ContainerEventType> containerEventTypes =
+ listener.events.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ ContainerEventType.INIT_CONTAINER,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.REINITIALIZE_CONTAINER,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
}
/**
@@ -1582,16 +1799,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
containerManager.updateContainer(updateRequest);
// Check response
Assert.assertEquals(
- 0, updateResponse.getSuccessfullyUpdatedContainers().size());
- Assert.assertEquals(2, updateResponse.getFailedRequests().size());
+ 1, updateResponse.getSuccessfullyUpdatedContainers().size());
+ Assert.assertEquals(1, updateResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
.getFailedRequests().entrySet()) {
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
- if (cId0.equals(entry.getKey())) {
- Assert.assertTrue(entry.getValue().getMessage()
- .contains("Resource can only be changed when a "
- + "container is in RUNNING or SCHEDULED state"));
- } else if (cId7.equals(entry.getKey())) {
+ if (cId7.equals(entry.getKey())) {
Assert.assertTrue(entry.getValue().getMessage()
.contains("Container " + cId7.toString()
+ " is not handled by this NodeManager"));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df800f6c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 7c74049..4b380ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -47,11 +48,17 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -76,6 +83,40 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
}
+ private static class Listener implements ContainerStateTransitionListener {
+
+ private final Map<ContainerId,
+ List<ContainerState>> states = new HashMap<>();
+ private final Map<ContainerId, List<ContainerEventType>> events =
+ new HashMap<>();
+
+ @Override
+ public void init(Context context) {}
+
+ @Override
+ public void preTransition(ContainerImpl op,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState beforeState,
+ ContainerEvent eventToBeProcessed) {
+ if (!states.containsKey(op.getContainerId())) {
+ states.put(op.getContainerId(), new ArrayList<>());
+ states.get(op.getContainerId()).add(beforeState);
+ events.put(op.getContainerId(), new ArrayList<>());
+ }
+ }
+
+ @Override
+ public void postTransition(ContainerImpl op,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState beforeState,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState afterState,
+ ContainerEvent processedEvent) {
+ states.get(op.getContainerId()).add(afterState);
+ events.get(op.getContainerId()).add(processedEvent.getType());
+ }
+ }
+
private boolean delayContainers = true;
@Override
@@ -542,6 +583,10 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
containerManager.start();
containerManager.getContainerScheduler().
setUsePauseEventForPreemption(true);
+
+ Listener listener = new Listener();
+ ((NodeManager.DefaultContainerStateListener)containerManager.getContext().
+ getContainerStateTransitionListener()).addListener(listener);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
@@ -606,6 +651,39 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
// starts running
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(0), ContainerState.DONE, 40);
+
+ List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState> containerStates =
+ listener.states.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.NEW,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.PAUSING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.PAUSED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RESUMING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.EXITED_WITH_SUCCESS,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.DONE), containerStates);
+ List<ContainerEventType> containerEventTypes =
+ listener.events.get(createContainerId(0));
+ Assert.assertEquals(Arrays.asList(ContainerEventType.INIT_CONTAINER,
+ ContainerEventType.CONTAINER_LAUNCHED,
+ ContainerEventType.PAUSE_CONTAINER,
+ ContainerEventType.CONTAINER_PAUSED,
+ ContainerEventType.RESUME_CONTAINER,
+ ContainerEventType.CONTAINER_RESUMED,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP), containerEventTypes);
}
/**
@@ -1068,6 +1146,9 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
@Test
public void testPromotionOfOpportunisticContainers() throws Exception {
containerManager.start();
+ Listener listener = new Listener();
+ ((NodeManager.DefaultContainerStateListener)containerManager.getContext().
+ getContainerStateTransitionListener()).addListener(listener);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
@@ -1150,6 +1231,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
Assert.assertEquals(1, containerStatuses.size());
+
for (ContainerStatus status : containerStatuses) {
if (org.apache.hadoop.yarn.api.records.ContainerState.RUNNING ==
status.getState()) {
@@ -1160,6 +1242,25 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
// Ensure no containers are queued.
Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
+
+ List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState> containerStates =
+ listener.states.get(createContainerId(1));
+ Assert.assertEquals(Arrays.asList(
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.NEW,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+ ContainerState.RUNNING), containerStates);
+ List<ContainerEventType> containerEventTypes =
+ listener.events.get(createContainerId(1));
+ Assert.assertEquals(Arrays.asList(
+ ContainerEventType.INIT_CONTAINER,
+ ContainerEventType.UPDATE_CONTAINER_TOKEN,
+ ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org