You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/14 05:12:01 UTC
[22/22] hadoop git commit: YARN-3868. Recovery support for container
resizing. Contributed by Meng Ding
YARN-3868. Recovery support for container resizing. Contributed by Meng Ding
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/78ad04db
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/78ad04db
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/78ad04db
Branch: refs/heads/YARN-1197
Commit: 78ad04db9bbd422faa29c2e60293f5a95e4016c2
Parents: 1496e40
Author: Jian He <ji...@apache.org>
Authored: Thu Aug 20 21:18:23 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Sun Sep 13 19:51:12 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../containermanager/ContainerManagerImpl.java | 5 +-
.../container/ContainerImpl.java | 8 +-
.../recovery/NMLeveldbStateStoreService.java | 22 ++
.../recovery/NMNullStateStoreService.java | 6 +
.../recovery/NMStateStoreService.java | 15 ++
.../TestContainerManagerRecovery.java | 233 ++++++++++++++++++-
.../recovery/NMMemoryStateStoreService.java | 11 +-
.../TestNMLeveldbStateStoreService.java | 11 +
9 files changed, 301 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index bce9f5c..eb00781 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -208,6 +208,8 @@ Release 2.8.0 - UNRELEASED
YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to
support container resizing. (Meng Ding via jianhe)
+ YARN-3868. Recovery support for container resizing. (Meng Ding via jianhe)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 868d8d3..39d2983 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -346,7 +346,7 @@ public class ContainerManagerImpl extends CompositeService implements
Container container = new ContainerImpl(getConfig(), dispatcher,
context.getNMStateStore(), req.getContainerLaunchContext(),
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
- rcs.getDiagnostics(), rcs.getKilled());
+ rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability());
context.getContainers().put(containerId, container);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
@@ -1101,6 +1101,9 @@ public class ContainerManagerImpl extends CompositeService implements
this.readLock.lock();
try {
if (!serviceStopped) {
+ // Persist container resource change for recovery
+ this.context.getNMStateStore().storeContainerResourceChanged(
+ containerId, targetResource);
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(
containerId, targetResource));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 5c61a92..eff2188 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -154,13 +154,19 @@ public class ContainerImpl implements Container {
Credentials creds, NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier,
RecoveredContainerStatus recoveredStatus, int exitCode,
- String diagnostics, boolean wasKilled) {
+ String diagnostics, boolean wasKilled, Resource recoveredCapability) {
this(conf, dispatcher, stateStore, launchContext, creds, metrics,
containerTokenIdentifier);
this.recoveredStatus = recoveredStatus;
this.exitCode = exitCode;
this.recoveredAsKilled = wasKilled;
this.diagnostics.append(diagnostics);
+ if (recoveredCapability != null
+ && !this.resource.equals(recoveredCapability)) {
+ // resource capability had been updated before NM was down
+ this.resource = Resource.newInstance(recoveredCapability.getMemory(),
+ recoveredCapability.getVirtualCores());
+ }
}
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index df58182..89c71bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -40,7 +40,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestP
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -99,6 +102,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+ private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
+ "/resourceChanged";
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
@@ -230,6 +235,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
rcs.status = RecoveredContainerStatus.COMPLETED;
rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
+ } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) {
+ rcs.capability = new ResourcePBImpl(
+ ResourceProto.parseFrom(entry.getValue()));
} else {
throw new IOException("Unexpected container state key: " + key);
}
@@ -275,6 +283,20 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeContainerResourceChanged(ContainerId containerId,
+ Resource capability) throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX;
+ try {
+ // New value will overwrite old values for the same key
+ db.put(bytes(key),
+ ((ResourcePBImpl) capability).getProto().toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
public void storeContainerKilled(ContainerId containerId)
throws IOException {
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index ab49543..d5dce9b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
@@ -88,6 +89,11 @@ public class NMNullStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeContainerResourceChanged(ContainerId containerId,
+ Resource capability) throws IOException {
+ }
+
+ @Override
public void storeContainerKilled(ContainerId containerId)
throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index fa66349..e8ccf54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
@@ -74,6 +75,7 @@ public abstract class NMStateStoreService extends AbstractService {
boolean killed = false;
String diagnostics = "";
StartContainerRequest startRequest;
+ Resource capability;
public RecoveredContainerStatus getStatus() {
return status;
@@ -94,6 +96,10 @@ public abstract class NMStateStoreService extends AbstractService {
public StartContainerRequest getStartRequest() {
return startRequest;
}
+
+ public Resource getCapability() {
+ return capability;
+ }
}
public static class LocalResourceTrackerState {
@@ -284,6 +290,15 @@ public abstract class NMStateStoreService extends AbstractService {
throws IOException;
/**
+ * Record that a container resource has been changed
+ * @param containerId the container ID
+ * @param capability the container resource capability
+ * @throws IOException
+ */
+ public abstract void storeContainerResourceChanged(ContainerId containerId,
+ Resource capability) throws IOException;
+
+ /**
* Record that a container has completed
* @param containerId the container ID
* @param exitCode the exit code from the container
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 4d0aacd..43f1b29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -28,18 +28,30 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -48,9 +60,17 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -58,6 +78,9 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -65,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -77,18 +101,50 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
import org.junit.Test;
-public class TestContainerManagerRecovery {
+public class TestContainerManagerRecovery extends BaseContainerManagerTest {
- private NodeManagerMetrics metrics = NodeManagerMetrics.create();
+ public TestContainerManagerRecovery() throws UnsupportedFileSystemException {
+ super();
+ }
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ localFS.delete(new Path(localDir.getAbsolutePath()), true);
+ localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
+ localFS.delete(new Path(localLogDir.getAbsolutePath()), true);
+ localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true);
+ localDir.mkdir();
+ tmpDir.mkdir();
+ localLogDir.mkdir();
+ remoteLogDir.mkdir();
+ LOG.info("Created localDir in " + localDir.getAbsolutePath());
+ LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
+
+ String bindAddress = "0.0.0.0:12345";
+ conf.set(YarnConfiguration.NM_ADDRESS, bindAddress);
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+ // Default delSrvc
+ delSrvc = createDeletionService();
+ delSrvc.init(conf);
+ exec = createContainerExecutor();
+ dirsHandler = new LocalDirsHandlerService();
+ nodeHealthChecker = new NodeHealthCheckerService(
+ NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
+ nodeHealthChecker.init(conf);
+ }
@Test
public void testApplicationRecovery() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
- conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
NMStateStoreService stateStore = new NMMemoryStateStoreService();
@@ -234,6 +290,91 @@ public class TestContainerManagerRecovery {
}
@Test
+ public void testContainerResizeRecovery() throws Exception {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+ NMStateStoreService stateStore = new NMMemoryStateStoreService();
+ stateStore.init(conf);
+ stateStore.start();
+ Context context = createContext(conf, stateStore);
+ ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+ cm.init(conf);
+ cm.start();
+ // add an application by starting a container
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+ Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+ Credentials containerCreds = new Credentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ containerCreds.writeTokenStorageToStream(dob);
+ ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
+ dob.getLength());
+ Map<ApplicationAccessType, String> acls = Collections.emptyMap();
+ File tmpDir = new File("target",
+ this.getClass().getSimpleName() + "-tmpDir");
+ File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ if (Shell.WINDOWS) {
+ fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+ } else {
+ fileWriter.write("\numask 0");
+ fileWriter.write("\nexec sleep 100");
+ }
+ fileWriter.close();
+ FileContext localFS = FileContext.getLocalFSFileContext();
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha = RecordFactoryProvider
+ .getRecordFactory(null).newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map<String, LocalResource> localResources = new HashMap<>();
+ localResources.put(destinationFile, rsrc_alpha);
+ List<String> commands =
+ Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+ ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+ localResources, containerEnv, commands, serviceData,
+ containerTokens, acls);
+ StartContainersResponse startResponse = startContainer(
+ context, cm, cid, clc, null);
+ assertTrue(startResponse.getFailedRequests().isEmpty());
+ assertEquals(1, context.getApplications().size());
+ Application app = context.getApplications().get(appId);
+ assertNotNull(app);
+ // make sure the container reaches RUNNING state
+ waitForNMContainerState(cm, cid,
+ org.apache.hadoop.yarn.server.nodemanager
+ .containermanager.container.ContainerState.RUNNING);
+ Resource targetResource = Resource.newInstance(2048, 2);
+ IncreaseContainersResourceResponse increaseResponse =
+ increaseContainersResource(context, cm, cid, targetResource);
+ assertTrue(increaseResponse.getFailedRequests().isEmpty());
+ // check status
+ ContainerStatus containerStatus = getContainerStatus(context, cm, cid);
+ assertEquals(targetResource, containerStatus.getCapability());
+ // restart and verify container is running and recovered
+ // to the correct size
+ cm.stop();
+ context = createContext(conf, stateStore);
+ cm = createContainerManager(context);
+ cm.init(conf);
+ cm.start();
+ assertEquals(1, context.getApplications().size());
+ app = context.getApplications().get(appId);
+ assertNotNull(app);
+ containerStatus = getContainerStatus(context, cm, cid);
+ assertEquals(targetResource, containerStatus.getCapability());
+ }
+
+ @Test
public void testContainerCleanupOnShutdown() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId =
@@ -257,10 +398,8 @@ public class TestContainerManagerRecovery {
LogAggregationContext.newInstance("includePattern", "excludePattern");
// verify containers are stopped on shutdown without recovery
- YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
- conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
Context context = createContext(conf, new NMNullStateStoreService());
ContainerManagerImpl cm = spy(createContainerManager(context));
cm.init(conf);
@@ -306,12 +445,36 @@ public class TestContainerManagerRecovery {
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
}
- private NMContext createContext(YarnConfiguration conf,
+ private ContainerManagerImpl createContainerManager(Context context,
+ DeletionService delSrvc) {
+ return new ContainerManagerImpl(context, exec, delSrvc,
+ mock(NodeStatusUpdater.class), metrics, dirsHandler) {
+ @Override
+ public void
+ setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+ // do nothing
+ }
+ @Override
+ protected void authorizeGetAndStopContainerRequest(
+ ContainerId containerId, Container container,
+ boolean stopRequest, NMTokenIdentifier identifier)
+ throws YarnException {
+ if(container == null || container.getUser().equals("Fail")){
+ throw new YarnException("Reject this container");
+ }
+ }
+ };
+ }
+
+ private NMContext createContext(Configuration conf,
NMStateStoreService stateStore) {
NMContext context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
- new ApplicationACLsManager(conf), stateStore);
-
+ new ApplicationACLsManager(conf), stateStore){
+ public int getHttpPort() {
+ return HTTP_PORT;
+ }
+ };
// simulate registration with RM
MasterKey masterKey = new MasterKeyPBImpl();
masterKey.setKeyId(123);
@@ -349,6 +512,58 @@ public class TestContainerManagerRecovery {
});
}
+ private IncreaseContainersResourceResponse increaseContainersResource(
+ Context context, final ContainerManagerImpl cm, ContainerId cid,
+ Resource capability) throws Exception {
+ UserGroupInformation user = UserGroupInformation.createRemoteUser(
+ cid.getApplicationAttemptId().toString());
+ // construct container resource increase request
+ final List<Token> increaseTokens = new ArrayList<Token>();
+ // add increase request
+ Token containerToken = TestContainerManager.createContainerToken(
+ cid, 0, context.getNodeId(), user.getShortUserName(),
+ capability, context.getContainerTokenSecretManager(), null);
+ increaseTokens.add(containerToken);
+ final IncreaseContainersResourceRequest increaseRequest =
+ IncreaseContainersResourceRequest.newInstance(increaseTokens);
+ NMTokenIdentifier nmToken = new NMTokenIdentifier(
+ cid.getApplicationAttemptId(), context.getNodeId(),
+ user.getShortUserName(),
+ context.getNMTokenSecretManager().getCurrentKey().getKeyId());
+ user.addTokenIdentifier(nmToken);
+ return user.doAs(
+ new PrivilegedExceptionAction<IncreaseContainersResourceResponse>() {
+ @Override
+ public IncreaseContainersResourceResponse run() throws Exception {
+ return cm.increaseContainersResource(increaseRequest);
+ }
+ });
+ }
+
+ private ContainerStatus getContainerStatus(
+ Context context, final ContainerManagerImpl cm, ContainerId cid)
+ throws Exception {
+ UserGroupInformation user = UserGroupInformation.createRemoteUser(
+ cid.getApplicationAttemptId().toString());
+ NMTokenIdentifier nmToken = new NMTokenIdentifier(
+ cid.getApplicationAttemptId(), context.getNodeId(),
+ user.getShortUserName(),
+ context.getNMTokenSecretManager().getCurrentKey().getKeyId());
+ user.addTokenIdentifier(nmToken);
+ List<ContainerId> containerIds = new ArrayList<>();
+ containerIds.add(cid);
+ final GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+ return user.doAs(
+ new PrivilegedExceptionAction<ContainerStatus>() {
+ @Override
+ public ContainerStatus run() throws Exception {
+ return cm.getContainerStatuses(gcsRequest)
+ .getContainerStatuses().get(0);
+ }
+ });
+ }
+
private void waitForAppState(Application app, ApplicationState state)
throws Exception {
final int msecPerSleep = 10;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index e0487e7..a1c95ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
@@ -122,9 +123,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
rcsCopy.killed = rcs.killed;
rcsCopy.diagnostics = rcs.diagnostics;
rcsCopy.startRequest = rcs.startRequest;
+ rcsCopy.capability = rcs.capability;
result.add(rcsCopy);
}
- return new ArrayList<RecoveredContainerState>();
+ return result;
}
@Override
@@ -153,6 +155,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
+ public synchronized void storeContainerResourceChanged(
+ ContainerId containerId, Resource capability) throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.capability = capability;
+ }
+
+ @Override
public synchronized void storeContainerKilled(ContainerId containerId)
throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/78ad04db/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 1804424..08b49e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -298,6 +298,17 @@ public class TestNMLeveldbStateStoreService {
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics());
+ // increase the container size, and verify recovered
+ stateStore.storeContainerResourceChanged(containerId, Resource.newInstance(2468, 4));
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ rcs = recoveredContainers.get(0);
+ assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
+ assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+ assertEquals(false, rcs.getKilled());
+ assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
+
// mark the container killed, add some more diags, and verify recovered
diags.append("some more diags for container");
stateStore.storeContainerDiagnostics(containerId, diags);