You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/09/19 20:01:28 UTC
[04/44] hadoop git commit: YARN-5620. Core changes in NodeManager to
support re-initialization of Containers with new launchContext. (asuresh)
YARN-5620. Core changes in NodeManager to support re-initialization of Containers with new launchContext. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/40b5a59b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/40b5a59b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/40b5a59b
Branch: refs/heads/YARN-2915
Commit: 40b5a59b726733df456330a26f03d5174cc0bc1c
Parents: 2a8f55a
Author: Arun Suresh <as...@apache.org>
Authored: Thu Sep 15 07:15:11 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Sep 15 07:15:11 2016 -0700
----------------------------------------------------------------------
.../nodemanager/DefaultContainerExecutor.java | 2 +-
.../containermanager/ContainerManagerImpl.java | 51 +++-
.../containermanager/container/Container.java | 6 +
.../container/ContainerEventType.java | 3 +-
.../container/ContainerImpl.java | 274 ++++++++++++++++---
.../container/ContainerReInitEvent.java | 62 +++++
.../container/ContainerState.java | 2 +-
.../launcher/ContainersLauncher.java | 1 +
.../launcher/ContainersLauncherEventType.java | 1 +
.../localizer/ResourceLocalizationService.java | 3 +-
.../containermanager/localizer/ResourceSet.java | 43 ++-
.../ContainerLocalizationRequestEvent.java | 4 +-
.../TestContainerManagerWithLCE.java | 36 +++
.../BaseContainerManagerTest.java | 6 +-
.../containermanager/TestContainerManager.java | 238 +++++++++++++++-
.../nodemanager/webapp/MockContainer.java | 15 +
16 files changed, 682 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 9a0549d..59b69ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -89,7 +89,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
protected void copyFile(Path src, Path dst, String owner) throws IOException {
- lfs.util().copy(src, dst);
+ lfs.util().copy(src, dst, false, true);
}
protected void setScriptExecutable(Path script, String owner) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 52d8566..f909ca5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -110,11 +110,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerReInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -163,6 +165,9 @@ import static org.apache.hadoop.service.Service.STATE.STARTED;
public class ContainerManagerImpl extends CompositeService implements
ContainerManager {
+ private enum ReinitOp {
+ UPGRADE, COMMIT, ROLLBACK, LOCALIZE;
+ }
/**
* Extra duration to wait for applications to be killed on shutdown.
*/
@@ -1529,18 +1534,8 @@ public class ContainerManagerImpl extends CompositeService implements
ResourceLocalizationRequest request) throws YarnException, IOException {
ContainerId containerId = request.getContainerId();
- Container container = context.getContainers().get(containerId);
- if (container == null) {
- throw new YarnException("Specified " + containerId + " does not exist!");
- }
- if (!container.getContainerState()
- .equals(org.apache.hadoop.yarn.server.nodemanager.
- containermanager.container.ContainerState.RUNNING)) {
- throw new YarnException(
- containerId + " is at " + container.getContainerState()
- + " state. Not able to localize new resources.");
- }
-
+ Container container = preUpgradeOrLocalizeCheck(containerId,
+ ReinitOp.LOCALIZE);
try {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
container.getResourceSet().addResources(request.getLocalResources());
@@ -1556,6 +1551,38 @@ public class ContainerManagerImpl extends CompositeService implements
return ResourceLocalizationResponse.newInstance();
}
+ public void upgradeContainer(ContainerId containerId,
+ ContainerLaunchContext upgradeLaunchContext) throws YarnException {
+ Container container = preUpgradeOrLocalizeCheck(containerId,
+ ReinitOp.UPGRADE);
+ ResourceSet resourceSet = new ResourceSet();
+ try {
+ resourceSet.addResources(upgradeLaunchContext.getLocalResources());
+ dispatcher.getEventHandler().handle(
+ new ContainerReInitEvent(containerId, upgradeLaunchContext,
+ resourceSet));
+ container.setIsReInitializing(true);
+ } catch (URISyntaxException e) {
+ LOG.info("Error when parsing local resource URI for upgrade of" +
+ "Container [" + containerId + "]", e);
+ throw new YarnException(e);
+ }
+ }
+
+ private Container preUpgradeOrLocalizeCheck(ContainerId containerId,
+ ReinitOp op) throws YarnException {
+ Container container = context.getContainers().get(containerId);
+ if (container == null) {
+ throw new YarnException("Specified " + containerId + " does not exist!");
+ }
+ if (!container.isRunning() || container.isReInitializing()) {
+ throw new YarnException("Cannot perform " + op + " on [" + containerId
+ + "]. Current state is [" + container.getContainerState() + ", " +
+ "isReInitializing=" + container.isReInitializing() + "].");
+ }
+ return container;
+ }
+
@SuppressWarnings("unchecked")
private void internalSignalToContainer(SignalContainerRequest request,
String sentBy) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index c4cea18..f6c27ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -76,4 +76,10 @@ public interface Container extends EventHandler<ContainerEvent> {
Priority getPriority();
ResourceSet getResourceSet();
+
+ boolean isRunning();
+
+ void setIsReInitializing(boolean isReInitializing);
+
+ boolean isReInitializing();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 5622f8c..0b57505 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -25,6 +25,7 @@ public enum ContainerEventType {
KILL_CONTAINER,
UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
+ REINITIALIZE_CONTAINER,
// DownloadManager
CONTAINER_INITED,
@@ -36,5 +37,5 @@ public enum ContainerEventType {
CONTAINER_LAUNCHED,
CONTAINER_EXITED_WITH_SUCCESS,
CONTAINER_EXITED_WITH_FAILURE,
- CONTAINER_KILLED_ON_REQUEST,
+ CONTAINER_KILLED_ON_REQUEST
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index ce9e581..12bbea9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -90,13 +91,24 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerImpl implements Container {
+ private final static class ReInitializationContext {
+ private final ResourceSet resourceSet;
+ private final ContainerLaunchContext newLaunchContext;
+
+ private ReInitializationContext(ContainerLaunchContext newLaunchContext,
+ ResourceSet resourceSet) {
+ this.newLaunchContext = newLaunchContext;
+ this.resourceSet = resourceSet;
+ }
+ }
+
private final Lock readLock;
private final Lock writeLock;
private final Dispatcher dispatcher;
private final NMStateStoreService stateStore;
private final Credentials credentials;
private final NodeManagerMetrics metrics;
- private final ContainerLaunchContext launchContext;
+ private volatile ContainerLaunchContext launchContext;
private final ContainerTokenIdentifier containerTokenIdentifier;
private final ContainerId containerId;
private volatile Resource resource;
@@ -110,13 +122,15 @@ public class ContainerImpl implements Container {
private long containerLaunchStartTime;
private ContainerMetrics containerMetrics;
private static Clock clock = SystemClock.getInstance();
- private final ContainerRetryContext containerRetryContext;
+ private ContainerRetryContext containerRetryContext;
// remaining retries to relaunch container if needed
private int remainingRetryAttempts;
private String workDir;
private String logDir;
private String host;
private String ips;
+ private ReInitializationContext reInitContext;
+ private volatile boolean isReInitializing = false;
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
@@ -141,23 +155,7 @@ public class ContainerImpl implements Container {
this.stateStore = context.getNMStateStore();
this.version = containerTokenIdentifier.getVersion();
this.launchContext = launchContext;
- if (launchContext != null
- && launchContext.getContainerRetryContext() != null) {
- this.containerRetryContext = launchContext.getContainerRetryContext();
- } else {
- this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
- }
- this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
- int minimumRestartInterval = conf.getInt(
- YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
- YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
- if (containerRetryContext.getRetryPolicy()
- != ContainerRetryPolicy.NEVER_RETRY
- && containerRetryContext.getRetryInterval() < minimumRestartInterval) {
- LOG.info("Set restart interval to minimum value " + minimumRestartInterval
- + "ms for container " + containerTokenIdentifier.getContainerID());
- this.containerRetryContext.setRetryInterval(minimumRestartInterval);
- }
+
this.diagnosticsMaxSize = conf.getInt(
YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
@@ -188,11 +186,37 @@ public class ContainerImpl implements Container {
containerMetrics.recordStartTime(clock.getTime());
}
+ // Configure the Retry Context
+ this.containerRetryContext =
+ configureRetryContext(conf, launchContext, this.containerId);
+ this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
stateMachine = stateMachineFactory.make(this);
this.context = context;
this.resourceSet = new ResourceSet();
}
+ private static ContainerRetryContext configureRetryContext(
+ Configuration conf, ContainerLaunchContext launchContext,
+ ContainerId containerId) {
+ ContainerRetryContext context;
+ if (launchContext != null
+ && launchContext.getContainerRetryContext() != null) {
+ context = launchContext.getContainerRetryContext();
+ } else {
+ context = ContainerRetryContext.NEVER_RETRY_CONTEXT;
+ }
+ int minimumRestartInterval = conf.getInt(
+ YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
+ if (context.getRetryPolicy() != ContainerRetryPolicy.NEVER_RETRY
+ && context.getRetryInterval() < minimumRestartInterval) {
+ LOG.info("Set restart interval to minimum value " + minimumRestartInterval
+ + "ms for container " + containerId);
+ context.setRetryInterval(minimumRestartInterval);
+ }
+ return context;
+ }
+
// constructor for a recovered container
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@@ -299,6 +323,9 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
+ ContainerEventType.REINITIALIZE_CONTAINER,
+ new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
@@ -310,10 +337,38 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
- .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE,
+ .addTransition(ContainerState.RUNNING,
+ ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
+ // From REINITIALIZING State
+ .addTransition(ContainerState.REINITIALIZING,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition(true))
+ .addTransition(ContainerState.REINITIALIZING,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition(true))
+ .addTransition(ContainerState.REINITIALIZING,
+ ContainerState.REINITIALIZING,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ new ResourceLocalizedWhileReInitTransition())
+ .addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
+ ContainerEventType.RESOURCE_FAILED,
+ new ResourceLocalizationFailedWhileReInitTransition())
+ .addTransition(ContainerState.REINITIALIZING,
+ ContainerState.REINITIALIZING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+ .addTransition(ContainerState.REINITIALIZING,
+ ContainerState.LOCALIZED,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+ new KilledForReInitializationTransition())
+
// From RELAUNCHING State
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
@@ -458,7 +513,7 @@ public class ContainerImpl implements Container {
}
@Override
- public Map<Path,List<String>> getLocalizedResources() {
+ public Map<Path, List<String>> getLocalizedResources() {
this.readLock.lock();
try {
if (ContainerState.LOCALIZED == getContainerState()
@@ -775,7 +830,7 @@ public class ContainerImpl implements Container {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
LocalResourceRequest resourceRequest = rsrcEvent.getResource();
Path location = rsrcEvent.getLocation();
- List<String> syms =
+ Set<String> syms =
container.resourceSet.resourceLocalized(resourceRequest, location);
if (null == syms) {
LOG.info("Localized resource " + resourceRequest +
@@ -822,17 +877,86 @@ public class ContainerImpl implements Container {
}
/**
- * Resource is localized while the container is running - create symlinks
+ * Transition to start the Re-Initialization process.
+ */
+ static class ReInitializeContainerTransition extends ContainerTransition {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ container.reInitContext = createReInitContext(event);
+ try {
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+ pendingResources =
+ container.reInitContext.resourceSet.getAllResourcesByVisibility();
+ if (!pendingResources.isEmpty()) {
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationRequestEvent(
+ container, pendingResources));
+ } else {
+ // We are not waiting on any resources, so...
+ // Kill the current container.
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
+ }
+ } catch (Exception e) {
+ LOG.error("Container [" + container.getContainerId() + "]" +
+ " re-initialization failure..", e);
+ container.addDiagnostics("Error re-initializing due to" +
+ "[" + e.getMessage() + "]");
+ }
+ }
+
+ protected ReInitializationContext createReInitContext(
+ ContainerEvent event) {
+ ContainerReInitEvent rEvent = (ContainerReInitEvent)event;
+ return new ReInitializationContext(rEvent.getReInitLaunchContext(),
+ rEvent.getResourceSet());
+ }
+ }
+
+ /**
+ * Resource requested for Container Re-initialization has been localized.
+ * If all dependencies are met, then restart Container with new bits.
+ */
+ static class ResourceLocalizedWhileReInitTransition
+ extends ContainerTransition {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ ContainerResourceLocalizedEvent rsrcEvent =
+ (ContainerResourceLocalizedEvent) event;
+ container.reInitContext.resourceSet.resourceLocalized(
+ rsrcEvent.getResource(), rsrcEvent.getLocation());
+ // Check if all ResourceLocalization has completed
+ if (container.reInitContext.resourceSet.getPendingResources()
+ .isEmpty()) {
+ // Kill the current container.
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
+ }
+ }
+ }
+
+ /**
+ * Resource is localized while the container is running - create symlinks.
*/
static class ResourceLocalizedWhileRunningTransition
extends ContainerTransition {
+ @SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
- List<String> links = container.resourceSet
- .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
+ Set<String> links = container.resourceSet.resourceLocalized(
+ rsrcEvent.getResource(), rsrcEvent.getLocation());
+ if (links == null) {
+ return;
+ }
// creating symlinks.
for (String link : links) {
try {
@@ -872,8 +996,29 @@ public class ContainerImpl implements Container {
}
/**
+ * Resource localization failed while the container is reinitializing.
+ */
+ static class ResourceLocalizationFailedWhileReInitTransition
+ extends ContainerTransition {
+
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ ContainerResourceFailedEvent failedEvent =
+ (ContainerResourceFailedEvent) event;
+ container.resourceSet.resourceLocalizationFailed(
+ failedEvent.getResource());
+ container.addDiagnostics("Container aborting re-initialization.. "
+ + failedEvent.getDiagnosticMessage());
+ LOG.error("Container [" + container.getContainerId() + "] Re-init" +
+ " failed !! Resource [" + failedEvent.getResource() + "] could" +
+ " not be localized !!");
+ container.reInitContext = null;
+ }
+ }
+
+ /**
* Transition from LOCALIZED state to RUNNING state upon receiving
- * a CONTAINER_LAUNCHED event
+ * a CONTAINER_LAUNCHED event.
*/
static class LaunchTransition extends ContainerTransition {
@SuppressWarnings("unchecked")
@@ -883,6 +1028,12 @@ public class ContainerImpl implements Container {
container.metrics.runningContainer();
container.wasLaunched = true;
+ if (container.reInitContext != null) {
+ container.reInitContext = null;
+ // Set rollback context here..
+ container.setIsReInitializing(false);
+ }
+
if (container.recoveredAsKilled) {
LOG.info("Killing " + container.containerId
+ " due to recovered as killed");
@@ -895,8 +1046,8 @@ public class ContainerImpl implements Container {
}
/**
- * Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
- * upon EXITED_WITH_SUCCESS message.
+ * Transition from RUNNING or KILLING state to
+ * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition {
@@ -909,6 +1060,8 @@ public class ContainerImpl implements Container {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
+
+ container.setIsReInitializing(false);
// Set exit code to 0 on success
container.exitCode = 0;
@@ -939,6 +1092,7 @@ public class ContainerImpl implements Container {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
+ container.setIsReInitializing(false);
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
container.exitCode = exitEvent.getExitCode();
if (exitEvent.getDiagnosticInfo() != null) {
@@ -959,7 +1113,7 @@ public class ContainerImpl implements Container {
}
/**
- * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon
+ * Transition to EXITED_WITH_FAILURE or RELAUNCHING state upon
* CONTAINER_EXITED_WITH_FAILURE state.
**/
@SuppressWarnings("unchecked") // dispatcher not typed
@@ -991,7 +1145,7 @@ public class ContainerImpl implements Container {
} catch (IOException e) {
LOG.warn(
"Unable to update remainingRetryAttempts in state store for "
- + container.getContainerId(), e);
+ + container.getContainerId(), e);
}
}
LOG.info("Relaunching Container " + container.getContainerId()
@@ -1053,7 +1207,7 @@ public class ContainerImpl implements Container {
}
/**
- * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
+ * Transition to EXITED_WITH_FAILURE
*/
static class KilledExternallyTransition extends ExitedWithFailureTransition {
KilledExternallyTransition() {
@@ -1061,13 +1215,44 @@ public class ContainerImpl implements Container {
}
@Override
- public void transition(ContainerImpl container, ContainerEvent event) {
+ public void transition(ContainerImpl container,
+ ContainerEvent event) {
super.transition(container, event);
container.addDiagnostics("Killed by external signal\n");
}
}
/**
+ * Transition to LOCALIZED and wait for RE-LAUNCH
+ */
+ static class KilledForReInitializationTransition extends ContainerTransition {
+
+ @Override
+ public void transition(ContainerImpl container,
+ ContainerEvent event) {
+ LOG.info("Relaunching Container [" + container.getContainerId()
+ + "] for upgrade !!");
+ container.wasLaunched = false;
+ container.metrics.endRunningContainer();
+
+ container.launchContext = container.reInitContext.newLaunchContext;
+
+ // Re configure the Retry Context
+ container.containerRetryContext =
+ configureRetryContext(container.context.getConf(),
+ container.launchContext, container.containerId);
+ // Reset the retry attempts since its a fresh start
+ container.remainingRetryAttempts =
+ container.containerRetryContext.getMaxRetries();
+
+ container.resourceSet = ResourceSet.merge(
+ container.resourceSet, container.reInitContext.resourceSet);
+
+ container.sendLaunchEvent();
+ }
+ }
+
+ /**
* Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
* RESOURCE_FAILED event.
*/
@@ -1122,16 +1307,20 @@ public class ContainerImpl implements Container {
}
/**
- * Transitions upon receiving KILL_CONTAINER:
- * - LOCALIZED -> KILLING
- * - RUNNING -> KILLING
+ * Transitions upon receiving KILL_CONTAINER.
+ * - LOCALIZED -> KILLING.
+ * - RUNNING -> KILLING.
+ * - REINITIALIZING -> KILLING.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class KillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
+
+ @SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Kill the process/process-grp
+ container.setIsReInitializing(false);
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
@@ -1385,4 +1574,19 @@ public class ContainerImpl implements Container {
public Priority getPriority() {
return containerTokenIdentifier.getPriority();
}
+
+ @Override
+ public boolean isRunning() {
+ return getContainerState() == ContainerState.RUNNING;
+ }
+
+ @Override
+ public void setIsReInitializing(boolean isReInitializing) {
+ this.isReInitializing = isReInitializing;
+ }
+
+ @Override
+ public boolean isReInitializing() {
+ return this.isReInitializing;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
new file mode 100644
index 0000000..2ccdbd7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
+
+/**
+ * ContainerEvent sent by ContainerManager to ContainerImpl to
+ * re-initiate Container.
+ */
+public class ContainerReInitEvent extends ContainerEvent {
+
+ private final ContainerLaunchContext reInitLaunchContext;
+ private final ResourceSet resourceSet;
+
+ /**
+ * Container Re-Init Event.
+ * @param cID Container Id
+ * @param upgradeContext Upgrade context
+ * @param resourceSet Resource Set
+ */
+ public ContainerReInitEvent(ContainerId cID,
+ ContainerLaunchContext upgradeContext, ResourceSet resourceSet){
+ super(cID, ContainerEventType.REINITIALIZE_CONTAINER);
+ this.reInitLaunchContext = upgradeContext;
+ this.resourceSet = resourceSet;
+ }
+
+ /**
+ * Get the Launch Context to be used for upgrade.
+ * @return ContainerLaunchContext
+ */
+ public ContainerLaunchContext getReInitLaunchContext() {
+ return reInitLaunchContext;
+ }
+
+ /**
+ * Get the ResourceSet.
+ * @return ResourceSet.
+ */
+ public ResourceSet getResourceSet() {
+ return resourceSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 6b96204..70de90c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -20,6 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING,
- EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
+ REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index e5fff00..d4a7bfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -137,6 +137,7 @@ public class ContainersLauncher extends AbstractService
running.put(containerId, launch);
break;
case CLEANUP_CONTAINER:
+ case CLEANUP_CONTAINER_FOR_REINIT:
ContainerLaunch launcher = running.remove(containerId);
if (launcher == null) {
// Container not launched. So nothing needs to be done.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 2d7bc74..380a032 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -23,5 +23,6 @@ public enum ContainersLauncherEventType {
RELAUNCH_CONTAINER,
RECOVER_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
+ CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
SIGNAL_CONTAINER,
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index b281ef5..2cf6ee9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -470,7 +470,8 @@ public class ResourceLocalizationService extends CompositeService
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
EnumSet<ContainerState> set =
- EnumSet.of(ContainerState.LOCALIZING, ContainerState.RUNNING);
+ EnumSet.of(ContainerState.LOCALIZING,
+ ContainerState.RUNNING, ContainerState.REINITIALIZING);
if (!set.contains(c.getContainerState())) {
LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
+ " state, do not localize resources.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
index a41ee20..5da3abc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java
@@ -43,9 +43,9 @@ public class ResourceSet {
private static final Log LOG = LogFactory.getLog(ResourceSet.class);
// resources by localization state (localized, pending, failed)
- private Map<Path, List<String>> localizedResources =
+ private Map<String, Path> localizedResources =
new ConcurrentHashMap<>();
- private Map<LocalResourceRequest, List<String>> pendingResources =
+ private Map<LocalResourceRequest, Set<String>> pendingResources =
new ConcurrentHashMap<>();
private Set<LocalResourceRequest> resourcesFailedToBeLocalized =
new HashSet<>();
@@ -69,7 +69,7 @@ public class ResourceSet {
if (localResourceMap == null || localResourceMap.isEmpty()) {
return null;
}
- Map<LocalResourceRequest, List<String>> allResources = new HashMap<>();
+ Map<LocalResourceRequest, Set<String>> allResources = new HashMap<>();
List<LocalResourceRequest> publicList = new ArrayList<>();
List<LocalResourceRequest> privateList = new ArrayList<>();
List<LocalResourceRequest> appList = new ArrayList<>();
@@ -77,7 +77,7 @@ public class ResourceSet {
for (Map.Entry<String, LocalResource> rsrc : localResourceMap.entrySet()) {
LocalResource resource = rsrc.getValue();
LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
- allResources.putIfAbsent(req, new ArrayList<>());
+ allResources.putIfAbsent(req, new HashSet<>());
allResources.get(req).add(rsrc.getKey());
storeSharedCacheUploadPolicy(req,
resource.getShouldBeUploadedToSharedCache());
@@ -121,13 +121,15 @@ public class ResourceSet {
* @param location The path where the resource is localized
* @return The list of symlinks for the localized resources.
*/
- public List<String> resourceLocalized(LocalResourceRequest request,
+ public Set<String> resourceLocalized(LocalResourceRequest request,
Path location) {
- List<String> symlinks = pendingResources.remove(request);
+ Set<String> symlinks = pendingResources.remove(request);
if (symlinks == null) {
return null;
} else {
- localizedResources.put(location, symlinks);
+ for (String symlink : symlinks) {
+ localizedResources.put(symlink, location);
+ }
return symlinks;
}
}
@@ -175,7 +177,12 @@ public class ResourceSet {
}
public Map<Path, List<String>> getLocalizedResources() {
- return localizedResources;
+ Map<Path, List<String>> map = new HashMap<>();
+ for (Map.Entry<String, Path> entry : localizedResources.entrySet()) {
+ map.putIfAbsent(entry.getValue(), new ArrayList<>());
+ map.get(entry.getValue()).add(entry.getKey());
+ }
+ return map;
}
public Map<LocalResourceRequest, Path> getResourcesToBeUploaded() {
@@ -186,7 +193,25 @@ public class ResourceSet {
return resourcesUploadPolicies;
}
- public Map<LocalResourceRequest, List<String>> getPendingResources() {
+ public Map<LocalResourceRequest, Set<String>> getPendingResources() {
return pendingResources;
}
+
+ public static ResourceSet merge(ResourceSet... resourceSets) {
+ ResourceSet merged = new ResourceSet();
+ for (ResourceSet rs : resourceSets) {
+ // This should overwrite existing symlinks
+ merged.localizedResources.putAll(rs.localizedResources);
+
+ merged.resourcesToBeUploaded.putAll(rs.resourcesToBeUploaded);
+ merged.resourcesUploadPolicies.putAll(rs.resourcesUploadPolicies);
+
+ // TODO : START : Should we de-dup here ?
+ merged.publicRsrcs.addAll(rs.publicRsrcs);
+ merged.privateRsrcs.addAll(rs.privateRsrcs);
+ merged.appRsrcs.addAll(rs.appRsrcs);
+ // TODO : END
+ }
+ return merged;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
index 43a2f33..0344275 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
@@ -39,8 +39,8 @@ public class ContainerLocalizationRequestEvent extends
/**
* Event requesting the localization of the rsrc.
- * @param c
- * @param rsrc
+ * @param c Container
+ * @param rsrc LocalResourceRequests map
*/
public ContainerLocalizationRequestEvent(Container c,
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
index aa0d975..8a27849 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
@@ -269,6 +269,42 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
super.testForcefulShutdownSignal();
}
+ @Override
+ public void testContainerUpgradeSuccess() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerUpgradeSuccess");
+ super.testContainerUpgradeSuccess();
+ }
+
+ @Override
+ public void testContainerUpgradeLocalizationFailure() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerUpgradeLocalizationFailure");
+ super.testContainerUpgradeLocalizationFailure();
+ }
+
+ @Override
+ public void testContainerUpgradeProcessFailure() throws IOException,
+ InterruptedException, YarnException {
+ // Don't run the test if the binary is not available.
+ if (!shouldRunTest()) {
+ LOG.info("LCE binary path is not passed. Not running the test");
+ return;
+ }
+ LOG.info("Running testContainerUpgradeProcessFailure");
+ super.testContainerUpgradeProcessFailure();
+ }
+
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index ec38501..d359c3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -123,7 +123,11 @@ public abstract class BaseContainerManagerTest {
conf) {
public int getHttpPort() {
return HTTP_PORT;
- };
+ }
+ @Override
+ public ContainerExecutor getContainerExecutor() {
+ return exec;
+ }
};
protected ContainerExecutor exec;
protected DeletionService delSrvc;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 5785e1f..843dc2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.verify;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
@@ -33,6 +34,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -64,6 +66,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -94,7 +98,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -366,6 +369,237 @@ public class TestContainerManager extends BaseContainerManagerTest {
DefaultContainerExecutor.containerIsAlive(pid));
}
+ @Test
+ public void testContainerUpgradeSuccess() throws IOException,
+ InterruptedException, YarnException {
+ containerManager.start();
+ // ////// Construct the Container-id
+ ContainerId cId = createContainerId(0);
+ File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
+ String pid = prepareInitialContainer(cId, oldStartFile);
+
+ File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
+
+ prepareContainerUpgrade(false, false, cId, newStartFile);
+
+ // Assert that the First process is not alive anymore
+ Assert.assertFalse("Process is still alive!",
+ DefaultContainerExecutor.containerIsAlive(pid));
+
+ BufferedReader reader =
+ new BufferedReader(new FileReader(newStartFile));
+ Assert.assertEquals("Upgrade World!", reader.readLine());
+
+ // Get the pid of the process
+ String newPid = reader.readLine().trim();
+ Assert.assertNotEquals("Old and New Pids must be different !", pid, newPid);
+ // No more lines
+ Assert.assertEquals(null, reader.readLine());
+
+ reader.close();
+
+ // Verify old file still exists and is accessible by
+ // the new process...
+ reader = new BufferedReader(new FileReader(oldStartFile));
+ Assert.assertEquals("Hello World!", reader.readLine());
+
+ // Assert that the New process is alive
+ Assert.assertTrue("New Process is not alive!",
+ DefaultContainerExecutor.containerIsAlive(newPid));
+ }
+
+ @Test
+ public void testContainerUpgradeLocalizationFailure() throws IOException,
+ InterruptedException, YarnException {
+ if (Shell.WINDOWS) {
+ return;
+ }
+ containerManager.start();
+ // ////// Construct the Container-id
+ ContainerId cId = createContainerId(0);
+ File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
+ String pid = prepareInitialContainer(cId, oldStartFile);
+
+ File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
+
+ prepareContainerUpgrade(true, true, cId, newStartFile);
+
+ // Assert that the First process is STILL alive
+ // since upgrade was terminated..
+ Assert.assertTrue("Process is NOT alive!",
+ DefaultContainerExecutor.containerIsAlive(pid));
+ }
+
+ @Test
+ public void testContainerUpgradeProcessFailure() throws IOException,
+ InterruptedException, YarnException {
+ if (Shell.WINDOWS) {
+ return;
+ }
+ containerManager.start();
+ // ////// Construct the Container-id
+ ContainerId cId = createContainerId(0);
+ File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
+
+ String pid = prepareInitialContainer(cId, oldStartFile);
+
+ File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();
+
+ prepareContainerUpgrade(true, false, cId, newStartFile);
+
+ // Assert that the First process is not alive anymore
+ Assert.assertFalse("Process is still alive!",
+ DefaultContainerExecutor.containerIsAlive(pid));
+ }
+
+ /**
+ * Prepare a launch Context for container upgrade and request the
+ * Container Manager to re-initialize a running container using the
+ * new launch context.
+ * @param failCmd injects a start script that intentionally fails.
+ * @param failLoc injects a bad file Location that will fail localization.
+ */
+ private void prepareContainerUpgrade(boolean failCmd, boolean failLoc,
+ ContainerId cId, File startFile)
+ throws FileNotFoundException, YarnException, InterruptedException {
+ // Re-write scriptfile and processStartFile
+ File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+
+ writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd);
+
+ ContainerLaunchContext containerLaunchContext =
+ prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc);
+
+ containerManager.upgradeContainer(cId, containerLaunchContext);
+ try {
+ containerManager.upgradeContainer(cId, containerLaunchContext);
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Cannot perform UPGRADE"));
+ }
+ int timeoutSecs = 0;
+ int maxTimeToWait = failLoc ? 10 : 20;
+ // Wait for new processStartfile to be created
+ while (!startFile.exists() && timeoutSecs++ < maxTimeToWait) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for New process start-file to be created");
+ }
+ }
+
+ /**
+ * Prepare and start an initial container. This container will be subsequently
+ * re-initialized for upgrade. It also waits for the container to start and
+ * returns the Pid of the running container.
+ */
+ private String prepareInitialContainer(ContainerId cId, File startFile)
+ throws IOException, YarnException, InterruptedException {
+ File scriptFileOld = Shell.appendScriptExtension(tmpDir, "scriptFile");
+ PrintWriter fileWriterOld = new PrintWriter(scriptFileOld);
+
+ writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false);
+
+ ContainerLaunchContext containerLaunchContext =
+ prepareContainerLaunchContext(scriptFileOld, "dest_file", false);
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ createContainerToken(cId,
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
+ context.getContainerTokenSecretManager()));
+ List<StartContainerRequest> list = new ArrayList<>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+
+ int timeoutSecs = 0;
+ while (!startFile.exists() && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for process start-file to be created");
+ }
+ Assert.assertTrue("ProcessStartFile doesn't exist!",
+ startFile.exists());
+
+ // Now verify the contents of the file
+ BufferedReader reader =
+ new BufferedReader(new FileReader(startFile));
+ Assert.assertEquals("Hello World!", reader.readLine());
+ // Get the pid of the process
+ String pid = reader.readLine().trim();
+ // No more lines
+ Assert.assertEquals(null, reader.readLine());
+
+ // Assert that the process is alive
+ Assert.assertTrue("Process is not alive!",
+ DefaultContainerExecutor.containerIsAlive(pid));
+ // Once more
+ Assert.assertTrue("Process is not alive!",
+ DefaultContainerExecutor.containerIsAlive(pid));
+ return pid;
+ }
+
+ private void writeScriptFile(PrintWriter fileWriter, String startLine,
+ File processStartFile, ContainerId cId, boolean isFailure) {
+ if (Shell.WINDOWS) {
+ fileWriter.println("@echo " + startLine + "> " + processStartFile);
+ fileWriter.println("@echo " + cId + ">> " + processStartFile);
+ fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+ } else {
+ fileWriter.write("\numask 0"); // So that start file is readable by test
+ if (isFailure) {
+ // Echo PID and throw some error code
+ fileWriter.write("\necho $$ >> " + processStartFile);
+ fileWriter.write("\nexit 111");
+ } else {
+ fileWriter.write("\necho " + startLine + " > " + processStartFile);
+ fileWriter.write("\necho $$ >> " + processStartFile);
+ fileWriter.write("\nexec sleep 100");
+ }
+ }
+ fileWriter.close();
+ }
+
+ private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile,
+ String destFName, boolean putBadFile) {
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ URL resourceAlpha = null;
+ if (putBadFile) {
+ File fileToDelete = new File(tmpDir, "fileToDelete")
+ .getAbsoluteFile();
+ resourceAlpha =
+ URL.fromPath(localFS
+ .makeQualified(new Path(fileToDelete.getAbsolutePath())));
+ fileToDelete.delete();
+ } else {
+ resourceAlpha =
+ URL.fromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ }
+ LocalResource rsrcAlpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrcAlpha.setResource(resourceAlpha);
+ rsrcAlpha.setSize(-1);
+ rsrcAlpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrcAlpha.setType(LocalResourceType.FILE);
+ rsrcAlpha.setTimestamp(scriptFile.lastModified());
+ Map<String, LocalResource> localResources = new HashMap<>();
+ localResources.put(destFName, rsrcAlpha);
+ containerLaunchContext.setLocalResources(localResources);
+
+ ContainerRetryContext containerRetryContext = ContainerRetryContext
+ .newInstance(
+ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
+ new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0);
+ containerLaunchContext.setContainerRetryContext(containerRetryContext);
+ List<String> commands = Arrays.asList(
+ Shell.getRunScriptCommand(scriptFile));
+ containerLaunchContext.setCommands(commands);
+ return containerLaunchContext;
+ }
+
protected void testContainerLaunchAndExit(int exitCode) throws IOException,
InterruptedException, YarnException {
@@ -556,7 +790,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
- e.getMessage().contains("Not able to localize new resources"));
+ e.getMessage().contains("Cannot perform LOCALIZE"));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b5a59b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index c176556..8c8bec7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -190,4 +190,19 @@ public class MockContainer implements Container {
public void setIpAndHost(String[] ipAndHost) {
}
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
+ public void setIsReInitializing(boolean isReInitializing) {
+
+ }
+
+ @Override
+ public boolean isReInitializing() {
+ return false;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org