You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/07/14 20:59:35 UTC

hadoop git commit: YARN-5049. Extend NMStateStore to save queued container information. (Konstantinos Karanasos via asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 a47820d72 -> 0101973db


YARN-5049. Extend NMStateStore to save queued container information. (Konstantinos Karanasos via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0101973d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0101973d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0101973d

Branch: refs/heads/branch-2
Commit: 0101973dbf0ef9ba0f3e58be66aebf01b1586c92
Parents: a47820d
Author: Arun Suresh <as...@apache.org>
Authored: Wed May 11 19:10:17 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Jul 14 13:57:22 2017 -0700

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  | 18 +++++++++++-
 .../scheduler/ContainerScheduler.java           | 19 +++++++------
 .../recovery/NMLeveldbStateStoreService.java    | 29 ++++++++++++++++++--
 .../recovery/NMNullStateStoreService.java       |  4 +++
 .../recovery/NMStateStoreService.java           |  9 ++++++
 .../recovery/NMMemoryStateStoreService.java     |  6 ++++
 .../TestNMLeveldbStateStoreService.java         | 12 ++++++++
 7 files changed, 85 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 095ce6d..14f30f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -376,7 +376,6 @@ public class ContainerManagerImpl extends CompositeService implements
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
 
-  @SuppressWarnings("unchecked")
   private void recoverContainer(RecoveredContainerState rcs)
       throws IOException {
     StartContainerRequest req = rcs.getStartRequest();
@@ -405,6 +404,7 @@ public class ContainerManagerImpl extends CompositeService implements
                 "Due to invalid StateStore info container was killed"
                     + " during recovery"));
       }
+      recoverActiveContainer(launchContext, token, rcs);
     } else {
       if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
         LOG.warn(containerId + " has no corresponding application!");
@@ -414,6 +414,22 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  /**
+   * Recover a running container.
+   */
+  @SuppressWarnings("unchecked")
+  protected void recoverActiveContainer(
+      ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
+      RecoveredContainerState rcs) throws IOException {
+    Credentials credentials = YarnServerSecurityUtils.parseCredentials(
+        launchContext);
+    Container container = new ContainerImpl(getConfig(), dispatcher,
+        launchContext, credentials, metrics, token, context, rcs);
+    context.getContainers().put(token.getContainerID(), container);
+    dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(
+        container));
+  }
+
   private void waitForRecoveredContainers() throws InterruptedException {
     final int sleepMsec = 100;
     int waitIterations = 100;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 5c96d55..11a8f3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -263,15 +264,15 @@ public class ContainerScheduler extends AbstractService implements
               "Opportunistic container queue is full.");
         }
       }
-//      if (isQueued) {
-//        try {
-//          this.context.getNMStateStore().storeContainerQueued(
-//              container.getContainerId());
-//        } catch (IOException e) {
-//          LOG.warn("Could not store container [" + container.getContainerId()
-//              + "] state. The Container has been queued.", e);
-//        }
-//      }
+      if (isQueued) {
+        try {
+          this.context.getNMStateStore().storeContainerQueued(
+              container.getContainerId());
+        } catch (IOException e) {
+          LOG.warn("Could not store container [" + container.getContainerId()
+              + "] state. The Container has been queued.", e);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 29b82a8..f1e47c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -85,7 +85,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String DB_NAME = "yarn-nm-state";
   private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
 
-  private static final Version CURRENT_VERSION_INFO = Version.newInstance(3, 0);
+  // Set to 1.1 by YARN-5049
+  // Set to 1.2 by YARN-6127
+  private static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(1, 2);
 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
@@ -112,6 +115,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String CONTAINER_VERSION_KEY_SUFFIX = "/version";
   private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
   private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+  private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
   private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
       "/resourceChanged";
   private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
@@ -256,8 +260,13 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         rcs.version = Integer.parseInt(asString(entry.getValue()));
       } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
         rcs.diagnostics = asString(entry.getValue());
-      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+      } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) {
         if (rcs.status == RecoveredContainerStatus.REQUESTED) {
+          rcs.status = RecoveredContainerStatus.QUEUED;
+        }
+      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+        if ((rcs.status == RecoveredContainerStatus.REQUESTED)
+            || (rcs.status == RecoveredContainerStatus.QUEUED)) {
           rcs.status = RecoveredContainerStatus.LAUNCHED;
         }
       } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
@@ -322,6 +331,21 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   }
 
   @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("storeContainerQueued: containerId=" + containerId);
+    }
+
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_QUEUED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -464,6 +488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
         List<String> unknownKeysForContainer =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 545cb74..96c3f9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -75,6 +75,10 @@ public class NMNullStateStoreService extends NMStateStoreService {
   }
 
   @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException {
+  }
+
+  @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 02bf186..9f87279 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -69,6 +69,7 @@ public abstract class NMStateStoreService extends AbstractService {
 
   public enum RecoveredContainerStatus {
     REQUESTED,
+    QUEUED,
     LAUNCHED,
     COMPLETED
   }
@@ -372,6 +373,14 @@ public abstract class NMStateStoreService extends AbstractService {
       throws IOException;
 
   /**
+   * Record that a container has been queued at the NM
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerQueued(ContainerId containerId)
+      throws IOException;
+
+  /**
    * Record that a container has been launched
    * @param containerId the container ID
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 5a48e2f..0e03994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -134,6 +134,12 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.QUEUED;
+  }
+
+  @Override
   public synchronized void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0101973d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 2e7e8ef..0133156 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -259,6 +259,18 @@ public class TestNMLeveldbStateStoreService {
     // check whether the new container record is discarded
     assertEquals(1, recoveredContainers.size());
 
+    // queue the container, and verify recovered
+    stateStore.storeContainerQueued(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+
     // launch the container, add some diagnostics, and verify recovered
     StringBuilder diags = new StringBuilder();
     stateStore.storeContainerLaunched(containerId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org