You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/07/14 20:59:35 UTC
hadoop git commit: YARN-5049. Extend NMStateStore to save queued
container information. (Konstantinos Karanasos via asuresh)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 a47820d72 -> 0101973db
YARN-5049. Extend NMStateStore to save queued container information. (Konstantinos Karanasos via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0101973d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0101973d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0101973d
Branch: refs/heads/branch-2
Commit: 0101973dbf0ef9ba0f3e58be66aebf01b1586c92
Parents: a47820d
Author: Arun Suresh <as...@apache.org>
Authored: Wed May 11 19:10:17 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Jul 14 13:57:22 2017 -0700
----------------------------------------------------------------------
.../containermanager/ContainerManagerImpl.java | 18 +++++++++++-
.../scheduler/ContainerScheduler.java | 19 +++++++------
.../recovery/NMLeveldbStateStoreService.java | 29 ++++++++++++++++++--
.../recovery/NMNullStateStoreService.java | 4 +++
.../recovery/NMStateStoreService.java | 9 ++++++
.../recovery/NMMemoryStateStoreService.java | 6 ++++
.../TestNMLeveldbStateStoreService.java | 12 ++++++++
7 files changed, 85 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 095ce6d..14f30f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -376,7 +376,6 @@ public class ContainerManagerImpl extends CompositeService implements
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
- @SuppressWarnings("unchecked")
private void recoverContainer(RecoveredContainerState rcs)
throws IOException {
StartContainerRequest req = rcs.getStartRequest();
@@ -405,6 +404,7 @@ public class ContainerManagerImpl extends CompositeService implements
"Due to invalid StateStore info container was killed"
+ " during recovery"));
}
+ recoverActiveContainer(launchContext, token, rcs);
} else {
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
LOG.warn(containerId + " has no corresponding application!");
@@ -414,6 +414,22 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
+ /**
+ * Recover a running container.
+ */
+ @SuppressWarnings("unchecked")
+ protected void recoverActiveContainer(
+ ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
+ RecoveredContainerState rcs) throws IOException {
+ Credentials credentials = YarnServerSecurityUtils.parseCredentials(
+ launchContext);
+ Container container = new ContainerImpl(getConfig(), dispatcher,
+ launchContext, credentials, metrics, token, context, rcs);
+ context.getContainers().put(token.getContainerID(), container);
+ dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(
+ container));
+ }
+
private void waitForRecoveredContainers() throws InterruptedException {
final int sleepMsec = 100;
int waitIterations = 100;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 5c96d55..11a8f3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -263,15 +264,15 @@ public class ContainerScheduler extends AbstractService implements
"Opportunistic container queue is full.");
}
}
-// if (isQueued) {
-// try {
-// this.context.getNMStateStore().storeContainerQueued(
-// container.getContainerId());
-// } catch (IOException e) {
-// LOG.warn("Could not store container [" + container.getContainerId()
-// + "] state. The Container has been queued.", e);
-// }
-// }
+ if (isQueued) {
+ try {
+ this.context.getNMStateStore().storeContainerQueued(
+ container.getContainerId());
+ } catch (IOException e) {
+ LOG.warn("Could not store container [" + container.getContainerId()
+ + "] state. The Container has been queued.", e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 29b82a8..f1e47c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -85,7 +85,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String DB_NAME = "yarn-nm-state";
private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
- private static final Version CURRENT_VERSION_INFO = Version.newInstance(3, 0);
+ // Set to 1.1 by YARN-5049
+ // Set to 1.2 by YARN-6127
+ private static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(1, 2);
private static final String DELETION_TASK_KEY_PREFIX =
"DeletionService/deltask_";
@@ -112,6 +115,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CONTAINER_VERSION_KEY_SUFFIX = "/version";
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+ private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
"/resourceChanged";
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
@@ -256,8 +260,13 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
rcs.version = Integer.parseInt(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
rcs.diagnostics = asString(entry.getValue());
- } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+ } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) {
if (rcs.status == RecoveredContainerStatus.REQUESTED) {
+ rcs.status = RecoveredContainerStatus.QUEUED;
+ }
+ } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+ if ((rcs.status == RecoveredContainerStatus.REQUESTED)
+ || (rcs.status == RecoveredContainerStatus.QUEUED)) {
rcs.status = RecoveredContainerStatus.LAUNCHED;
}
} else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
@@ -322,6 +331,21 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeContainerQueued(ContainerId containerId) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("storeContainerQueued: containerId=" + containerId);
+ }
+
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_QUEUED_KEY_SUFFIX;
+ try {
+ db.put(bytes(key), EMPTY_VALUE);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
public void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException {
if (LOG.isDebugEnabled()) {
@@ -464,6 +488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
+ batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
List<String> unknownKeysForContainer =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 545cb74..96c3f9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -75,6 +75,10 @@ public class NMNullStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeContainerQueued(ContainerId containerId) throws IOException {
+ }
+
+ @Override
public void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 02bf186..9f87279 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -69,6 +69,7 @@ public abstract class NMStateStoreService extends AbstractService {
public enum RecoveredContainerStatus {
REQUESTED,
+ QUEUED,
LAUNCHED,
COMPLETED
}
@@ -372,6 +373,14 @@ public abstract class NMStateStoreService extends AbstractService {
throws IOException;
/**
+ * Record that a container has been queued at the NM
+ * @param containerId the container ID
+ * @throws IOException
+ */
+ public abstract void storeContainerQueued(ContainerId containerId)
+ throws IOException;
+
+ /**
* Record that a container has been launched
* @param containerId the container ID
* @throws IOException
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 5a48e2f..0e03994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -134,6 +134,12 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
+ public void storeContainerQueued(ContainerId containerId) throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.status = RecoveredContainerStatus.QUEUED;
+ }
+
+ @Override
public synchronized void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 2e7e8ef..0133156 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -259,6 +259,18 @@ public class TestNMLeveldbStateStoreService {
// check whether the new container record is discarded
assertEquals(1, recoveredContainers.size());
+ // queue the container, and verify recovered
+ stateStore.storeContainerQueued(containerId);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ rcs = recoveredContainers.get(0);
+ assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
+ assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+ assertEquals(false, rcs.getKilled());
+ assertEquals(containerReq, rcs.getStartRequest());
+ assertTrue(rcs.getDiagnostics().isEmpty());
+
// launch the container, add some diagnostics, and verify recovered
StringBuilder diags = new StringBuilder();
stateStore.storeContainerLaunched(containerId);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org