You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/11 06:48:47 UTC
[22/50] [abbrv] hadoop git commit: YARN-7033. Add support for NM
Recovery of assigned resources (e.g. GPU's, NUMA,
FPGA's) to container. (Devaraj K and Wangda Tan)
YARN-7033. Add support for NM Recovery of assigned resources (e.g. GPU's, NUMA, FPGA's) to container. (Devaraj K and Wangda Tan)
Change-Id: Iffd18bb95debe1c8cc55e30abc1d8f663e9d0e30
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f155ab7c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f155ab7c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f155ab7c
Branch: refs/heads/YARN-5972
Commit: f155ab7cfa64e822171708040cb49889338961ce
Parents: a4cd101
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Sep 7 14:13:37 2017 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Thu Sep 7 14:13:37 2017 -0700
----------------------------------------------------------------------
.../containermanager/container/Container.java | 7 +
.../container/ContainerImpl.java | 13 ++
.../container/ResourceMappings.java | 124 ++++++++++++++
.../recovery/NMLeveldbStateStoreService.java | 42 +++++
.../recovery/NMNullStateStoreService.java | 7 +
.../recovery/NMStateStoreService.java | 23 +++
.../TestContainerManagerRecovery.java | 160 +++++++++++++------
.../recovery/NMMemoryStateStoreService.java | 14 ++
.../TestNMLeveldbStateStoreService.java | 121 +++++++++-----
.../nodemanager/webapp/MockContainer.java | 6 +
10 files changed, 431 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index ac9fbb7..ef5d72c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -96,4 +96,11 @@ public interface Container extends EventHandler<ContainerEvent> {
void sendKillEvent(int exitStatus, String description);
boolean isRecovering();
+
+ /**
+ * Get assigned resource mappings to the container.
+ *
+ * @return Resource Mappings of the container
+ */
+ ResourceMappings getResourceMappings();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 772b6e7..a768d18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -185,6 +185,7 @@ public class ContainerImpl implements Container {
private boolean recoveredAsKilled = false;
private Context context;
private ResourceSet resourceSet;
+ private ResourceMappings resourceMappings;
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@@ -242,6 +243,7 @@ public class ContainerImpl implements Container {
stateMachine = stateMachineFactory.make(this);
this.context = context;
this.resourceSet = new ResourceSet();
+ this.resourceMappings = new ResourceMappings();
}
private static ContainerRetryContext configureRetryContext(
@@ -282,6 +284,7 @@ public class ContainerImpl implements Container {
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
+ this.resourceMappings = rcs.getResourceMappings();
}
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
@@ -1789,4 +1792,14 @@ public class ContainerImpl implements Container {
getContainerState() == ContainerState.NEW);
return isRecovering;
}
+
+ /**
+ * Get assigned resource mappings to the container.
+ *
+ * @return Resource Mappings of the container
+ */
+ @Override
+ public ResourceMappings getResourceMappings() {
+ return resourceMappings;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java
new file mode 100644
index 0000000..d673341
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * This class is used to store assigned resource to a single container by
+ * resource types.
+ *
+ * Assigned resource could be list of String
+ *
+ * For example, we can assign container to:
+ * "numa": ["numa0"]
+ * "gpu": ["0", "1", "2", "3"]
+ * "fpga": ["1", "3"]
+ *
+ * This will be used for NM restart container recovery.
+ */
+public class ResourceMappings {
+
+ private Map<String, AssignedResources> assignedResourcesMap = new HashMap<>();
+
+ /**
+ * Get all resource mappings.
+ * @param resourceType resourceType
+ * @return map of resource mapping
+ */
+ public List<Serializable> getAssignedResources(String resourceType) {
+ AssignedResources ar = assignedResourcesMap.get(resourceType);
+ if (null == ar) {
+ return Collections.emptyList();
+ }
+ return ar.getAssignedResources();
+ }
+
+ /**
+ * Adds the resources for a given resource type.
+ *
+ * @param resourceType Resource Type
+ * @param assigned Assigned resources to add
+ */
+ public void addAssignedResources(String resourceType,
+ AssignedResources assigned) {
+ assignedResourcesMap.put(resourceType, assigned);
+ }
+
+ /**
+ * Stores resources assigned to a container for a given resource type.
+ */
+ public static class AssignedResources implements Serializable {
+ private static final long serialVersionUID = -1059491941955757926L;
+ private List<Serializable> resources = Collections.emptyList();
+
+ public List<Serializable> getAssignedResources() {
+ return Collections.unmodifiableList(resources);
+ }
+
+ public void updateAssignedResources(List<Serializable> list) {
+ this.resources = new ArrayList<>(list);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static AssignedResources fromBytes(byte[] bytes)
+ throws IOException {
+ ObjectInputStream ois = null;
+ List<Serializable> resources;
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ois = new ObjectInputStream(bis);
+ resources = (List<Serializable>) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } finally {
+ IOUtils.closeQuietly(ois);
+ }
+ AssignedResources ar = new AssignedResources();
+ ar.updateAssignedResources(resources);
+ return ar;
+ }
+
+ public byte[] toBytes() throws IOException {
+ ObjectOutputStream oos = null;
+ byte[] bytes;
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ oos = new ObjectOutputStream(bos);
+ oos.writeObject(resources);
+ bytes = bos.toByteArray();
+ } finally {
+ IOUtils.closeQuietly(oos);
+ }
+ return bytes;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index a31756e..db931f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -39,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
@@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDelet
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
@@ -144,6 +147,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
+ private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX =
+ "/assignedResources_";
+
private static final byte[] EMPTY_VALUE = new byte[0];
private DB db;
@@ -286,6 +292,13 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
rcs.setLogDir(asString(entry.getValue()));
+ } else if (suffix.startsWith(CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX)) {
+ String resourceType = suffix.substring(
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX.length());
+ ResourceMappings.AssignedResources assignedResources =
+ ResourceMappings.AssignedResources.fromBytes(entry.getValue());
+ rcs.getResourceMappings().addAssignedResources(resourceType,
+ assignedResources);
} else {
LOG.warn("the container " + containerId
+ " will be killed because of the unknown key " + key
@@ -1091,6 +1104,35 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
+ @Override
+ public void storeAssignedResources(ContainerId containerId,
+ String resourceType, List<Serializable> assignedResources)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("storeAssignedResources: containerId=" + containerId
+ + ", assignedResources=" + StringUtils.join(",", assignedResources));
+ }
+
+ String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ ResourceMappings.AssignedResources res =
+ new ResourceMappings.AssignedResources();
+ res.updateAssignedResources(assignedResources);
+
+ // New value will overwrite old values for the same key
+ batch.put(bytes(keyResChng), res.toBytes());
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
@SuppressWarnings("deprecation")
private void cleanupDeprecatedFinishedApps() {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 86dc99f..dc1cece 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -258,6 +259,12 @@ public class NMNullStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeAssignedResources(ContainerId containerId,
+ String resourceType, List<Serializable> assignedResources)
+ throws IOException {
+ }
+
+ @Override
protected void initStorage(Configuration conf) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index ec534bf..62a2b9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Deletion
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@Private
@Unstable
@@ -88,6 +90,7 @@ public abstract class NMStateStoreService extends AbstractService {
private RecoveredContainerType recoveryType =
RecoveredContainerType.RECOVER;
private long startTime;
+ private ResourceMappings resMappings = new ResourceMappings();
public RecoveredContainerStatus getStatus() {
return status;
@@ -172,6 +175,14 @@ public abstract class NMStateStoreService extends AbstractService {
public void setRecoveryType(RecoveredContainerType recoveryType) {
this.recoveryType = recoveryType;
}
+
+ public ResourceMappings getResourceMappings() {
+ return resMappings;
+ }
+
+ public void setResourceMappings(ResourceMappings mappings) {
+ this.resMappings = mappings;
+ }
}
public static class LocalResourceTrackerState {
@@ -699,6 +710,18 @@ public abstract class NMStateStoreService extends AbstractService {
public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
throws IOException;
+ /**
+ * Store the assigned resources to a container.
+ *
+ * @param containerId Container Id
+ * @param resourceType Resource Type
+ * @param assignedResources Assigned resources
+ * @throws IOException if fails
+ */
+ public abstract void storeAssignedResources(ContainerId containerId,
+ String resourceType, List<Serializable> assignedResources)
+ throws IOException;
+
protected abstract void initStorage(Configuration conf) throws IOException;
protected abstract void startStorage() throws IOException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 224e99c..5ec0ae6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -90,6 +91,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -108,6 +110,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerIn
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -400,9 +403,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
- Context context = createContext(conf, stateStore);
+ context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
- cm.dispatcher.disableExitOnDispatchException();
cm.init(conf);
cm.start();
// add an application by starting a container
@@ -410,55 +412,12 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
- Map<String, String> containerEnv = new HashMap<>();
- setFlowContext(containerEnv, "app_name1", appId);
- Map<String, ByteBuffer> serviceData = Collections.emptyMap();
- Credentials containerCreds = new Credentials();
- DataOutputBuffer dob = new DataOutputBuffer();
- containerCreds.writeTokenStorageToStream(dob);
- ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
- dob.getLength());
- Map<ApplicationAccessType, String> acls = Collections.emptyMap();
- File tmpDir = new File("target",
- this.getClass().getSimpleName() + "-tmpDir");
- File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
- PrintWriter fileWriter = new PrintWriter(scriptFile);
- if (Shell.WINDOWS) {
- fileWriter.println("@ping -n 100 127.0.0.1 >nul");
- } else {
- fileWriter.write("\numask 0");
- fileWriter.write("\nexec sleep 100");
- }
- fileWriter.close();
- FileContext localFS = FileContext.getLocalFSFileContext();
- URL resource_alpha =
- URL.fromPath(localFS
- .makeQualified(new Path(scriptFile.getAbsolutePath())));
- LocalResource rsrc_alpha = RecordFactoryProvider
- .getRecordFactory(null).newRecordInstance(LocalResource.class);
- rsrc_alpha.setResource(resource_alpha);
- rsrc_alpha.setSize(-1);
- rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
- rsrc_alpha.setType(LocalResourceType.FILE);
- rsrc_alpha.setTimestamp(scriptFile.lastModified());
- String destinationFile = "dest_file";
- Map<String, LocalResource> localResources = new HashMap<>();
- localResources.put(destinationFile, rsrc_alpha);
- List<String> commands =
- Arrays.asList(Shell.getRunScriptCommand(scriptFile));
- ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
- localResources, containerEnv, commands, serviceData,
- containerTokens, acls);
- StartContainersResponse startResponse = startContainer(
- context, cm, cid, clc, null);
- assertTrue(startResponse.getFailedRequests().isEmpty());
- assertEquals(1, context.getApplications().size());
+
+ commonLaunchContainer(appId, cid, cm);
+
Application app = context.getApplications().get(appId);
assertNotNull(app);
- // make sure the container reaches RUNNING state
- waitForNMContainerState(cm, cid,
- org.apache.hadoop.yarn.server.nodemanager
- .containermanager.container.ContainerState.RUNNING);
+
Resource targetResource = Resource.newInstance(2048, 2);
ContainerUpdateResponse updateResponse =
updateContainers(context, cm, cid, targetResource);
@@ -481,6 +440,58 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
}
@Test
+ public void testResourceMappingRecoveryForContainer() throws Exception {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+ NMStateStoreService stateStore = new NMMemoryStateStoreService();
+ stateStore.init(conf);
+ stateStore.start();
+ context = createContext(conf, stateStore);
+ ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+ cm.init(conf);
+ cm.start();
+
+ // add an application by starting a container
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+
+ commonLaunchContainer(appId, cid, cm);
+
+ Application app = context.getApplications().get(appId);
+ assertNotNull(app);
+
+ // store resource mapping of the container
+ List<Serializable> gpuResources = Arrays.asList("1", "2", "3");
+ stateStore.storeAssignedResources(cid, "gpu", gpuResources);
+ List<Serializable> numaResources = Arrays.asList("numa1");
+ stateStore.storeAssignedResources(cid, "numa", numaResources);
+ List<Serializable> fpgaResources = Arrays.asList("fpga1", "fpga2");
+ stateStore.storeAssignedResources(cid, "fpga", fpgaResources);
+
+ cm.stop();
+ context = createContext(conf, stateStore);
+ cm = createContainerManager(context);
+ cm.init(conf);
+ cm.start();
+ assertEquals(1, context.getApplications().size());
+ app = context.getApplications().get(appId);
+ assertNotNull(app);
+
+ Container nmContainer = context.getContainers().get(cid);
+ Assert.assertNotNull(nmContainer);
+ ResourceMappings resourceMappings = nmContainer.getResourceMappings();
+ List<Serializable> assignedResource = resourceMappings
+ .getAssignedResources("gpu");
+ Assert.assertTrue(assignedResource.equals(gpuResources));
+ Assert.assertTrue(
+ resourceMappings.getAssignedResources("numa").equals(numaResources));
+ Assert.assertTrue(
+ resourceMappings.getAssignedResources("fpga").equals(fpgaResources));
+ }
+
+ @Test
public void testContainerCleanupOnShutdown() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId =
@@ -552,6 +563,57 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
}
+ private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
+ ContainerManagerImpl cm) throws Exception {
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, "app_name1", appId);
+ Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+ Credentials containerCreds = new Credentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ containerCreds.writeTokenStorageToStream(dob);
+ ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
+ dob.getLength());
+ Map<ApplicationAccessType, String> acls = Collections.emptyMap();
+ File tmpDir = new File("target",
+ this.getClass().getSimpleName() + "-tmpDir");
+ File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ if (Shell.WINDOWS) {
+ fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+ } else {
+ fileWriter.write("\numask 0");
+ fileWriter.write("\nexec sleep 100");
+ }
+ fileWriter.close();
+ FileContext localFS = FileContext.getLocalFSFileContext();
+ URL resource_alpha =
+ URL.fromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha = RecordFactoryProvider
+ .getRecordFactory(null).newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map<String, LocalResource> localResources = new HashMap<>();
+ localResources.put(destinationFile, rsrc_alpha);
+ List<String> commands =
+ Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+ ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+ localResources, containerEnv, commands, serviceData,
+ containerTokens, acls);
+ StartContainersResponse startResponse = startContainer(
+ context, cm, cid, clc, null);
+ assertTrue(startResponse.getFailedRequests().isEmpty());
+ assertEquals(1, context.getApplications().size());
+ // make sure the container reaches RUNNING state
+ waitForNMContainerState(cm, cid,
+ org.apache.hadoop.yarn.server.nodemanager
+ .containermanager.container.ContainerState.RUNNING);
+ }
+
private ContainerManagerImpl createContainerManager(Context context,
DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index c1638df..6d6875d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<ApplicationId, ContainerManagerApplicationProto> apps;
@@ -119,6 +121,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
+ rcsCopy.setResourceMappings(rcs.getResourceMappings());
result.add(rcsCopy);
}
return result;
@@ -480,6 +483,17 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
amrmProxyState.getAppContexts().remove(attempt);
}
+ @Override
+ public void storeAssignedResources(ContainerId containerId,
+ String resourceType, List<Serializable> assignedResources)
+ throws IOException {
+ ResourceMappings.AssignedResources ar =
+ new ResourceMappings.AssignedResources();
+ ar.updateAssignedResources(assignedResources);
+ containerStates.get(containerId).getResourceMappings()
+ .addAssignedResources(resourceType, ar);
+ }
+
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index b0a9bc9..b76f1ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -961,46 +962,12 @@ public class TestNMLeveldbStateStoreService {
.loadContainersState();
assertTrue(recoveredContainers.isEmpty());
- // create a container request
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
4);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
- LocalResource lrsrc = LocalResource.newInstance(
- URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
- 1234567890L);
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
- localResources.put("rsrc", lrsrc);
- Map<String, String> env = new HashMap<String, String>();
- env.put("somevar", "someval");
- List<String> containerCmds = new ArrayList<String>();
- containerCmds.add("somecmd");
- containerCmds.add("somearg");
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
- serviceData.put("someservice",
- ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
- ByteBuffer containerTokens = ByteBuffer
- .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
- Map<ApplicationAccessType, String> acls =
- new HashMap<ApplicationAccessType, String>();
- acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
- acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
- ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
- localResources, env, containerCmds,
- serviceData, containerTokens, acls);
- Resource containerRsrc = Resource.newInstance(1357, 3);
- ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
- containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
- Priority.newInstance(7), 13579);
- Token containerToken = Token.newInstance(containerTokenId.getBytes(),
- ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
- "tokenservice");
- StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
- containerToken);
-
- stateStore.storeContainer(containerId, 0, 0, containerReq);
+ StartContainerRequest startContainerRequest = storeMockContainer(
+ containerId);
// add a invalid key
byte[] invalidKey = ("ContainerManager/containers/"
@@ -1013,7 +980,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
- assertEquals(containerReq, rcs.getStartRequest());
+ assertEquals(startContainerRequest, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
// assert unknown keys are cleaned up finally
@@ -1121,6 +1088,86 @@ public class TestNMLeveldbStateStoreService {
}
}
+ @Test
+ public void testStateStoreForResourceMapping() throws IOException {
+ // test empty when no state
+ List<RecoveredContainerState> recoveredContainers = stateStore
+ .loadContainersState();
+ assertTrue(recoveredContainers.isEmpty());
+
+ ApplicationId appId = ApplicationId.newInstance(1234, 3);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 4);
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+ storeMockContainer(containerId);
+
+ // Store ResourceMapping
+ stateStore.storeAssignedResources(containerId, "gpu",
+ Arrays.asList("1", "2", "3"));
+ // This will overwrite above
+ List<Serializable> gpuRes1 = Arrays.asList("1", "2", "4");
+ stateStore.storeAssignedResources(containerId, "gpu", gpuRes1);
+ List<Serializable> fpgaRes = Arrays.asList("3", "4", "5", "6");
+ stateStore.storeAssignedResources(containerId, "fpga", fpgaRes);
+ List<Serializable> numaRes = Arrays.asList("numa1");
+ stateStore.storeAssignedResources(containerId, "numa", numaRes);
+
+ // add a invalid key
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ RecoveredContainerState rcs = recoveredContainers.get(0);
+ List<Serializable> res = rcs.getResourceMappings()
+ .getAssignedResources("gpu");
+ Assert.assertTrue(res.equals(gpuRes1));
+
+ res = rcs.getResourceMappings().getAssignedResources("fpga");
+ Assert.assertTrue(res.equals(fpgaRes));
+
+ res = rcs.getResourceMappings().getAssignedResources("numa");
+ Assert.assertTrue(res.equals(numaRes));
+ }
+
+ private StartContainerRequest storeMockContainer(ContainerId containerId)
+ throws IOException {
+ // create a container request
+ LocalResource lrsrc = LocalResource.newInstance(
+ URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+ 1234567890L);
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put("rsrc", lrsrc);
+ Map<String, String> env = new HashMap<String, String>();
+ env.put("somevar", "someval");
+ List<String> containerCmds = new ArrayList<String>();
+ containerCmds.add("somecmd");
+ containerCmds.add("somearg");
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+ serviceData.put("someservice",
+ ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+ ByteBuffer containerTokens = ByteBuffer
+ .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>();
+ acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+ acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+ ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+ localResources, env, containerCmds,
+ serviceData, containerTokens, acls);
+ Resource containerRsrc = Resource.newInstance(1357, 3);
+ ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
+ containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
+ Priority.newInstance(7), 13579);
+ Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+ ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+ "tokenservice");
+ StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
+ containerToken);
+ stateStore.storeContainer(containerId, 0, 0, containerReq);
+ return containerReq;
+ }
+
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f155ab7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 57bee8c..d435ba0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -239,4 +240,9 @@ public class MockContainer implements Container {
public long getContainerStartTime() {
return 0;
}
+
+ @Override
+ public ResourceMappings getResourceMappings() {
+ return null;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org