You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/09/19 20:01:45 UTC
[21/44] hadoop git commit: YARN-5637. Changes in NodeManager to
support Container rollback and commit. (asuresh)
YARN-5637. Changes in NodeManager to support Container rollback and commit. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3552c2b9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3552c2b9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3552c2b9
Branch: refs/heads/YARN-2915
Commit: 3552c2b99dff4f21489ff284f9dcba40e897a1e5
Parents: ea839bd
Author: Arun Suresh <as...@apache.org>
Authored: Fri Sep 16 16:53:18 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Sun Sep 18 10:55:18 2016 -0700
----------------------------------------------------------------------
.../containermanager/ContainerManagerImpl.java | 68 ++++++-
.../containermanager/container/Container.java | 4 +
.../container/ContainerEventType.java | 1 +
.../container/ContainerImpl.java | 188 ++++++++++++++-----
.../container/ContainerReInitEvent.java | 20 +-
.../TestContainerManagerWithLCE.java | 42 ++++-
.../containermanager/TestContainerManager.java | 152 +++++++++++++--
.../nodemanager/webapp/MockContainer.java | 10 +
8 files changed, 401 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3552c2b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index f909ca5..8a9ad99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -165,8 +165,8 @@ import static org.apache.hadoop.service.Service.STATE.STARTED;
public class ContainerManagerImpl extends CompositeService implements
ContainerManager {
- private enum ReinitOp {
- UPGRADE, COMMIT, ROLLBACK, LOCALIZE;
+ private enum ReInitOp {
+ RE_INIT, COMMIT, ROLLBACK, LOCALIZE;
}
/**
* Extra duration to wait for applications to be killed on shutdown.
@@ -1535,7 +1535,7 @@ public class ContainerManagerImpl extends CompositeService implements
ContainerId containerId = request.getContainerId();
Container container = preUpgradeOrLocalizeCheck(containerId,
- ReinitOp.LOCALIZE);
+ ReInitOp.LOCALIZE);
try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
container.getResourceSet().addResources(request.getLocalResources());
@@ -1551,16 +1551,31 @@ public class ContainerManagerImpl extends CompositeService implements
return ResourceLocalizationResponse.newInstance();
}
- public void upgradeContainer(ContainerId containerId,
- ContainerLaunchContext upgradeLaunchContext) throws YarnException {
+ /**
+ * ReInitialize a container using a new Launch Context. If the
+ * retryFailureContext is not provided, The container is
+ * terminated on Failure.
+ *
+ * NOTE: Auto-Commit is true by default. This also means that the rollback
+ * context is purged as soon as the command to start the new process
+ * is sent. (The Container moves to RUNNING state)
+ *
+ * @param containerId Container Id.
+ * @param autoCommit Auto Commit flag.
+ * @param reInitLaunchContext Target Launch Context.
+ * @throws YarnException Yarn Exception.
+ */
+ public void reInitializeContainer(ContainerId containerId,
+ ContainerLaunchContext reInitLaunchContext, boolean autoCommit)
+ throws YarnException {
Container container = preUpgradeOrLocalizeCheck(containerId,
- ReinitOp.UPGRADE);
+ ReInitOp.RE_INIT);
ResourceSet resourceSet = new ResourceSet();
try {
- resourceSet.addResources(upgradeLaunchContext.getLocalResources());
+ resourceSet.addResources(reInitLaunchContext.getLocalResources());
dispatcher.getEventHandler().handle(
- new ContainerReInitEvent(containerId, upgradeLaunchContext,
- resourceSet));
+ new ContainerReInitEvent(containerId, reInitLaunchContext,
+ resourceSet, autoCommit));
container.setIsReInitializing(true);
} catch (URISyntaxException e) {
LOG.info("Error when parsing local resource URI for upgrade of" +
@@ -1569,8 +1584,41 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
+ /**
+ * Rollback the last reInitialization, if possible.
+ * @param containerId Container ID.
+ * @throws YarnException Yarn Exception.
+ */
+ public void rollbackReInitialization(ContainerId containerId)
+ throws YarnException {
+ Container container = preUpgradeOrLocalizeCheck(containerId,
+ ReInitOp.ROLLBACK);
+ if (container.canRollback()) {
+ dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerId, ContainerEventType.ROLLBACK_REINIT));
+ } else {
+ throw new YarnException("Nothing to rollback to !!");
+ }
+ }
+
+ /**
+ * Commit last reInitialization after which no rollback will be possible.
+ * @param containerId Container ID.
+ * @throws YarnException Yarn Exception.
+ */
+ public void commitReInitialization(ContainerId containerId)
+ throws YarnException {
+ Container container = preUpgradeOrLocalizeCheck(containerId,
+ ReInitOp.COMMIT);
+ if (container.canRollback()) {
+ container.commitUpgrade();
+ } else {
+ throw new YarnException("Nothing to Commit !!");
+ }
+ }
+
private Container preUpgradeOrLocalizeCheck(ContainerId containerId,
- ReinitOp op) throws YarnException {
+ ReInitOp op) throws YarnException {
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new YarnException("Specified " + containerId + " does not exist!");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3552c2b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index f6c27ab..78c240a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -82,4 +82,8 @@ public interface Container extends EventHandler<ContainerEvent> {
void setIsReInitializing(boolean isReInitializing);
boolean isReInitializing();
+
+ boolean canRollback();
+
+ void commitUpgrade();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3552c2b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 0b57505..afea0e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -26,6 +26,7 @@ public enum ContainerEventType {
UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
REINITIALIZE_CONTAINER,
+ ROLLBACK_REINIT,
// DownloadManager
CONTAINER_INITED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3552c2b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 12bbea9..0707df0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -91,14 +91,42 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerImpl implements Container {
- private final static class ReInitializationContext {
- private final ResourceSet resourceSet;
+ private static final class ReInitializationContext {
private final ContainerLaunchContext newLaunchContext;
+ private final ResourceSet newResourceSet;
+
+ // Rollback state
+ private final ContainerLaunchContext oldLaunchContext;
+ private final ResourceSet oldResourceSet;
private ReInitializationContext(ContainerLaunchContext newLaunchContext,
- ResourceSet resourceSet) {
+ ResourceSet newResourceSet,
+ ContainerLaunchContext oldLaunchContext,
+ ResourceSet oldResourceSet) {
this.newLaunchContext = newLaunchContext;
- this.resourceSet = resourceSet;
+ this.newResourceSet = newResourceSet;
+ this.oldLaunchContext = oldLaunchContext;
+ this.oldResourceSet = oldResourceSet;
+ }
+
+ private boolean canRollback() {
+ return (oldLaunchContext != null);
+ }
+
+ private ResourceSet mergedResourceSet() {
+ if (oldLaunchContext == null) {
+ return newResourceSet;
+ }
+ return ResourceSet.merge(oldResourceSet, newResourceSet);
+ }
+
+ private ReInitializationContext createContextForRollback() {
+ if (oldLaunchContext == null) {
+ return null;
+ } else {
+ return new ReInitializationContext(
+ oldLaunchContext, oldResourceSet, null, null);
+ }
}
}
@@ -129,7 +157,7 @@ public class ContainerImpl implements Container {
private String logDir;
private String host;
private String ips;
- private ReInitializationContext reInitContext;
+ private volatile ReInitializationContext reInitContext;
private volatile boolean isReInitializing = false;
/** The NM-wide configuration - not specific to this container */
@@ -187,8 +215,8 @@ public class ContainerImpl implements Container {
}
// Configure the Retry Context
- this.containerRetryContext =
- configureRetryContext(conf, launchContext, this.containerId);
+ this.containerRetryContext = configureRetryContext(
+ conf, launchContext, this.containerId);
this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
stateMachine = stateMachineFactory.make(this);
this.context = context;
@@ -320,12 +348,16 @@ public class ContainerImpl implements Container {
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RELAUNCHING,
+ ContainerState.LOCALIZED,
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
ContainerEventType.REINITIALIZE_CONTAINER,
new ReInitializeContainerTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
+ ContainerEventType.ROLLBACK_REINIT,
+ new RollbackContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
@@ -884,15 +916,15 @@ public class ContainerImpl implements Container {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
- container.reInitContext = createReInitContext(event);
+ container.reInitContext = createReInitContext(container, event);
try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
- pendingResources =
- container.reInitContext.resourceSet.getAllResourcesByVisibility();
- if (!pendingResources.isEmpty()) {
+ resByVisibility = container.reInitContext.newResourceSet
+ .getAllResourcesByVisibility();
+ if (!resByVisibility.isEmpty()) {
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(
- container, pendingResources));
+ container, resByVisibility));
} else {
// We are not waiting on any resources, so...
// Kill the current container.
@@ -909,10 +941,30 @@ public class ContainerImpl implements Container {
}
protected ReInitializationContext createReInitContext(
- ContainerEvent event) {
- ContainerReInitEvent rEvent = (ContainerReInitEvent)event;
- return new ReInitializationContext(rEvent.getReInitLaunchContext(),
- rEvent.getResourceSet());
+ ContainerImpl container, ContainerEvent event) {
+ ContainerReInitEvent reInitEvent = (ContainerReInitEvent)event;
+ return new ReInitializationContext(
+ reInitEvent.getReInitLaunchContext(),
+ reInitEvent.getResourceSet(),
+ // If AutoCommit is turned on, then no rollback can happen...
+ // So don't need to store the previous context.
+ (reInitEvent.isAutoCommit() ? null : container.launchContext),
+ (reInitEvent.isAutoCommit() ? null : container.resourceSet));
+ }
+ }
+
+ /**
+ * Transition to start the Rollback process.
+ */
+ static class RollbackContainerTransition extends
+ ReInitializeContainerTransition {
+
+ @Override
+ protected ReInitializationContext createReInitContext(ContainerImpl
+ container, ContainerEvent event) {
+ LOG.warn("Container [" + container.getContainerId() + "]" +
+ " about to be explicitly Rolledback !!");
+ return container.reInitContext.createContextForRollback();
}
}
@@ -928,10 +980,10 @@ public class ContainerImpl implements Container {
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
- container.reInitContext.resourceSet.resourceLocalized(
+ container.reInitContext.newResourceSet.resourceLocalized(
rsrcEvent.getResource(), rsrcEvent.getLocation());
// Check if all ResourceLocalization has completed
- if (container.reInitContext.resourceSet.getPendingResources()
+ if (container.reInitContext.newResourceSet.getPendingResources()
.isEmpty()) {
// Kill the current container.
container.dispatcher.getEventHandler().handle(
@@ -1028,10 +1080,13 @@ public class ContainerImpl implements Container {
container.metrics.runningContainer();
container.wasLaunched = true;
- if (container.reInitContext != null) {
+ container.setIsReInitializing(false);
+ // Check if this launch was due to a re-initialization.
+ // If autocommit == true, then wipe the re-init context. This ensures
+ // that any subsequent failures do not trigger a rollback.
+ if (container.reInitContext != null
+ && !container.reInitContext.canRollback()) {
container.reInitContext = null;
- // Set rollback context here..
- container.setIsReInitializing(false);
}
if (container.recoveredAsKilled) {
@@ -1148,36 +1203,50 @@ public class ContainerImpl implements Container {
+ container.getContainerId(), e);
}
}
- LOG.info("Relaunching Container " + container.getContainerId()
- + ". Remaining retry attempts(after relaunch) : "
- + container.remainingRetryAttempts
- + ". Interval between retries is "
- + container.containerRetryContext.getRetryInterval() + "ms");
- container.wasLaunched = false;
- container.metrics.endRunningContainer();
- if (container.containerRetryContext.getRetryInterval() == 0) {
- container.sendRelaunchEvent();
- } else {
- // wait for some time, then send launch event
- new Thread() {
- @Override
- public void run() {
- try {
- Thread.sleep(
- container.containerRetryContext.getRetryInterval());
- container.sendRelaunchEvent();
- } catch (InterruptedException e) {
- return;
- }
- }
- }.start();
- }
+ doRelaunch(container, container.remainingRetryAttempts,
+ container.containerRetryContext.getRetryInterval());
return ContainerState.RELAUNCHING;
+ } else if (container.canRollback()) {
+ // Rollback is possible only if the previous launch context is
+ // available.
+ container.addDiagnostics("Container Re-init Auto Rolled-Back.");
+ LOG.info("Rolling back Container reInitialization for [" +
+ container.getContainerId() + "] !!");
+ container.reInitContext =
+ container.reInitContext.createContextForRollback();
+ new KilledForReInitializationTransition().transition(container, event);
+ return ContainerState.LOCALIZED;
} else {
new ExitedWithFailureTransition(true).transition(container, event);
return ContainerState.EXITED_WITH_FAILURE;
}
}
+
+ private void doRelaunch(final ContainerImpl container,
+ int remainingRetryAttempts, final int retryInterval) {
+ LOG.info("Relaunching Container " + container.getContainerId()
+ + ". Remaining retry attempts(after relaunch) : "
+ + remainingRetryAttempts + ". Interval between retries is "
+ + retryInterval + "ms");
+ container.wasLaunched = false;
+ container.metrics.endRunningContainer();
+ if (retryInterval == 0) {
+ container.sendRelaunchEvent();
+ } else {
+ // wait for some time, then send launch event
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(retryInterval);
+ container.sendRelaunchEvent();
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }.start();
+ }
+ }
}
@Override
@@ -1188,24 +1257,29 @@ public class ContainerImpl implements Container {
@Override
public boolean shouldRetry(int errorCode) {
+ return shouldRetry(errorCode, containerRetryContext,
+ remainingRetryAttempts);
+ }
+
+ public static boolean shouldRetry(int errorCode,
+ ContainerRetryContext retryContext, int remainingRetryAttempts) {
if (errorCode == ExitCode.SUCCESS.getExitCode()
|| errorCode == ExitCode.FORCE_KILLED.getExitCode()
|| errorCode == ExitCode.TERMINATED.getExitCode()) {
return false;
}
- ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy();
+ ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy();
if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
- && containerRetryContext.getErrorCodes() != null
- && containerRetryContext.getErrorCodes().contains(errorCode))) {
+ && retryContext.getErrorCodes() != null
+ && retryContext.getErrorCodes().contains(errorCode))) {
return remainingRetryAttempts > 0
|| remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
}
return false;
}
-
/**
* Transition to EXITED_WITH_FAILURE
*/
@@ -1240,13 +1314,12 @@ public class ContainerImpl implements Container {
// Re configure the Retry Context
container.containerRetryContext =
configureRetryContext(container.context.getConf(),
- container.launchContext, container.containerId);
+ container.launchContext, container.containerId);
// Reset the retry attempts since its a fresh start
container.remainingRetryAttempts =
container.containerRetryContext.getMaxRetries();
- container.resourceSet = ResourceSet.merge(
- container.resourceSet, container.reInitContext.resourceSet);
+ container.resourceSet = container.reInitContext.mergedResourceSet();
container.sendLaunchEvent();
}
@@ -1589,4 +1662,15 @@ public class ContainerImpl implements Container {
public boolean isReInitializing() {
return this.isReInitializing;
}
+
+ @Override
+ public boolean canRollback() {
+ return (this.reInitContext != null)
+ && (this.reInitContext.canRollback());
+ }
+
+ @Override
+ public void commitUpgrade() {
+ this.reInitContext = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3552c2b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
index 2ccdbd7..46eba03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
@@ -30,18 +30,22 @@ public class ContainerReInitEvent extends ContainerEvent {
private final ContainerLaunchContext reInitLaunchContext;
private final ResourceSet resourceSet;
+ private final boolean autoCommit;
/**
* Container Re-Init Event.
- * @param cID Container Id
- * @param upgradeContext Upgrade context
- * @param resourceSet Resource Set
+ * @param cID Container Id.
+ * @param upgradeContext Upgrade Context.
+ * @param resourceSet Resource Set.
+ * @param autoCommit Auto Commit.
*/
public ContainerReInitEvent(ContainerId cID,
- ContainerLaunchContext upgradeContext, ResourceSet resourceSet){
+ ContainerLaunchContext upgradeContext,
+ ResourceSet resourceSet, boolean autoCommit){
super(cID, ContainerEventType.REINITIALIZE_CONTAINER);
this.reInitLaunchContext = upgradeContext;
this.resourceSet = resourceSet;
+ this.autoCommit = autoCommit;
}
/**
@@ -59,4 +63,12 @@ public class ContainerReInitEvent extends ContainerEvent {
public ResourceSet getResourceSet() {
return resourceSet;
}
+
+ /**
+ * Should this re-Initialization be auto-committed.
+ * @return AutoCommit.
+ */
+ public boolean isAutoCommit() {
+ return autoCommit;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3552c2b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index 8a27849..79182ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -270,15 +270,15 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
}
@Override
- public void testContainerUpgradeSuccess() throws IOException,
+ public void testContainerUpgradeSuccessAutoCommit() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
- LOG.info("Running testContainerUpgradeSuccess");
- super.testContainerUpgradeSuccess();
+ LOG.info("Running testContainerUpgradeSuccessAutoCommit");
+ super.testContainerUpgradeSuccessAutoCommit();
}
@Override
@@ -294,6 +294,42 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
}
@Override
+ public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerUpgradeSuccessExplicitCommit");
+ super.testContainerUpgradeSuccessExplicitCommit();
+ }
+
+ @Override
+ public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerUpgradeSuccessExplicitRollback");
+ super.testContainerUpgradeSuccessExplicitRollback();
+ }
+
+ @Override
+ public void testContainerUpgradeRollbackDueToFailure() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerUpgradeRollbackDueToFailure");
+ super.testContainerUpgradeRollbackDueToFailure();
+ }
+
+ @Override
public void testContainerUpgradeProcessFailure() throws IOException,
InterruptedException, YarnException {
// Don't run the test if the binary is not available.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3552c2b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 843dc2a..72049e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -369,9 +369,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
DefaultContainerExecutor.containerIsAlive(pid));
}
- @Test
- public void testContainerUpgradeSuccess() throws IOException,
- InterruptedException, YarnException {
+ private String[] testContainerUpgradeSuccess(boolean autoCommit)
+ throws IOException, InterruptedException, YarnException {
containerManager.start();
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
@@ -381,7 +380,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
- prepareContainerUpgrade(false, false, cId, newStartFile);
+ prepareContainerUpgrade(autoCommit, false, false, cId, newStartFile);
// Assert that the First process is not alive anymore
Assert.assertFalse("Process is still alive!",
@@ -407,6 +406,80 @@ public class TestContainerManager extends BaseContainerManagerTest {
// Assert that the New process is alive
Assert.assertTrue("New Process is not alive!",
DefaultContainerExecutor.containerIsAlive(newPid));
+ return new String[]{pid, newPid};
+ }
+
+ @Test
+ public void testContainerUpgradeSuccessAutoCommit() throws IOException,
+ InterruptedException, YarnException {
+ testContainerUpgradeSuccess(true);
+ // Should not be able to Commit (since already auto committed)
+ try {
+ containerManager.commitReInitialization(createContainerId(0));
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
+ }
+ }
+
+ @Test
+ public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
+ InterruptedException, YarnException {
+ testContainerUpgradeSuccess(false);
+ ContainerId cId = createContainerId(0);
+ containerManager.commitReInitialization(cId);
+ // Should not be able to Rollback once committed
+ try {
+ containerManager.rollbackReInitialization(cId);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Nothing to rollback to"));
+ }
+ }
+
+ @Test
+ public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
+ InterruptedException, YarnException {
+ String[] pids = testContainerUpgradeSuccess(false);
+
+ // Delete the old start File..
+ File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+ oldStartFile.delete();
+
+ ContainerId cId = createContainerId(0);
+ // Explicit Rollback
+ containerManager.rollbackReInitialization(cId);
+
+ // Original should be dead anyway
+ Assert.assertFalse("Original Process is still alive!",
+ DefaultContainerExecutor.containerIsAlive(pids[0]));
+
+ // Wait for upgraded process to die
+ int timeoutSecs = 0;
+ while (!DefaultContainerExecutor.containerIsAlive(pids[1])
+ && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for Upgraded process to die..");
+ }
+
+ timeoutSecs = 0;
+ // Wait for new processStartfile to be created
+ while (!oldStartFile.exists() && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for New process start-file to be created");
+ }
+
+ // Now verify the contents of the file
+ BufferedReader reader =
+ new BufferedReader(new FileReader(oldStartFile));
+ Assert.assertEquals("Hello World!", reader.readLine());
+ // Get the pid of the process
+ String rolledBackPid = reader.readLine().trim();
+ // No more lines
+ Assert.assertEquals(null, reader.readLine());
+
+ Assert.assertNotEquals("The Rolled-back process should be a different pid",
+ pids[0], rolledBackPid);
}
@Test
@@ -424,7 +497,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
- prepareContainerUpgrade(true, true, cId, newStartFile);
+ prepareContainerUpgrade(false, true, true, cId, newStartFile);
// Assert that the First process is STILL alive
// since upgrade was terminated..
@@ -447,22 +520,69 @@ public class TestContainerManager extends BaseContainerManagerTest {
File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
- prepareContainerUpgrade(true, false, cId, newStartFile);
+ // Since Autocommit is true, there is also no rollback context...
+ // which implies that if the new process fails, since there is no
+ // rollback, it is terminated.
+ prepareContainerUpgrade(true, true, false, cId, newStartFile);
// Assert that the First process is not alive anymore
Assert.assertFalse("Process is still alive!",
DefaultContainerExecutor.containerIsAlive(pid));
}
+ @Test
+ public void testContainerUpgradeRollbackDueToFailure() throws IOException,
+ InterruptedException, YarnException {
+ if (Shell.WINDOWS) {
+ return;
+ }
+ containerManager.start();
+ // ////// Construct the Container-id
+ ContainerId cId = createContainerId(0);
+ File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
+ String pid = prepareInitialContainer(cId, oldStartFile);
+
+ File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
+
+ prepareContainerUpgrade(false, true, false, cId, newStartFile);
+
+ // Assert that the First process is not alive anymore
+ Assert.assertFalse("Original Process is still alive!",
+ DefaultContainerExecutor.containerIsAlive(pid));
+
+ int timeoutSecs = 0;
+ // Wait for oldStartFile to be created
+ while (!oldStartFile.exists() && timeoutSecs++ < 20) {
+ System.out.println("\nFiles: " +
+ Arrays.toString(oldStartFile.getParentFile().list()));
+ Thread.sleep(1000);
+ LOG.info("Waiting for New process start-file to be created");
+ }
+
+ // Now verify the contents of the file
+ BufferedReader reader =
+ new BufferedReader(new FileReader(oldStartFile));
+ Assert.assertEquals("Hello World!", reader.readLine());
+ // Get the pid of the process
+ String rolledBackPid = reader.readLine().trim();
+ // No more lines
+ Assert.assertEquals(null, reader.readLine());
+
+ Assert.assertNotEquals("The Rolled-back process should be a different pid",
+ pid, rolledBackPid);
+ }
+
/**
* Prepare a launch Context for container upgrade and request the
* Container Manager to re-initialize a running container using the
* new launch context.
+ * @param autoCommit Enable autoCommit.
* @param failCmd injects a start script that intentionally fails.
* @param failLoc injects a bad file Location that will fail localization.
*/
- private void prepareContainerUpgrade(boolean failCmd, boolean failLoc,
- ContainerId cId, File startFile)
+ private void prepareContainerUpgrade(boolean autoCommit, boolean failCmd,
+ boolean failLoc, ContainerId cId, File startFile)
throws FileNotFoundException, YarnException, InterruptedException {
// Re-write scriptfile and processStartFile
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
@@ -471,13 +591,15 @@ public class TestContainerManager extends BaseContainerManagerTest {
writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd);
ContainerLaunchContext containerLaunchContext =
- prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc);
+ prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc, 0);
- containerManager.upgradeContainer(cId, containerLaunchContext);
+ containerManager.reInitializeContainer(cId, containerLaunchContext,
+ autoCommit);
try {
- containerManager.upgradeContainer(cId, containerLaunchContext);
+ containerManager.reInitializeContainer(cId, containerLaunchContext,
+ autoCommit);
} catch (Exception e) {
- Assert.assertTrue(e.getMessage().contains("Cannot perform UPGRADE"));
+ Assert.assertTrue(e.getMessage().contains("Cannot perform RE_INIT"));
}
int timeoutSecs = 0;
int maxTimeToWait = failLoc ? 10 : 20;
@@ -501,7 +623,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false);
ContainerLaunchContext containerLaunchContext =
- prepareContainerLaunchContext(scriptFileOld, "dest_file", false);
+ prepareContainerLaunchContext(scriptFileOld, "dest_file", false, 4);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(containerLaunchContext,
@@ -562,7 +684,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
}
private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile,
- String destFName, boolean putBadFile) {
+ String destFName, boolean putBadFile, int numRetries) {
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resourceAlpha = null;
@@ -592,7 +714,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerRetryContext containerRetryContext = ContainerRetryContext
.newInstance(
ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
- new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0);
+ new HashSet<>(Arrays.asList(Integer.valueOf(111))), numRetries, 0);
containerLaunchContext.setContainerRetryContext(containerRetryContext);
List<String> commands = Arrays.asList(
Shell.getRunScriptCommand(scriptFile));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3552c2b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 8c8bec7..164488d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -205,4 +205,14 @@ public class MockContainer implements Container {
public boolean isReInitializing() {
return false;
}
+
+ @Override
+ public boolean canRollback() {
+ return false;
+ }
+
+ @Override
+ public void commitUpgrade() {
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org