You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/12 00:54:46 UTC

[12/50] [abbrv] hadoop git commit: YARN-2331. Distinguish shutdown during supervision vs. shutdown for rolling upgrade. Contributed by Jason Lowe

YARN-2331. Distinguish shutdown during supervision vs. shutdown for
rolling upgrade. Contributed by Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/088156de
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/088156de
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/088156de

Branch: refs/heads/HDFS-7240
Commit: 088156de43abb07bec590a3fcd1a5af2feb02cd2
Parents: d0e75e6
Author: Xuan <xg...@apache.org>
Authored: Fri May 8 15:10:43 2015 -0700
Committer: Xuan <xg...@apache.org>
Committed: Fri May 8 15:10:43 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   4 +
 .../src/main/resources/yarn-default.xml         |   9 ++
 .../containermanager/ContainerManagerImpl.java  |   7 +-
 .../logaggregation/LogAggregationService.java   |   5 +-
 .../TestContainerManagerRecovery.java           | 119 +++++++++++++++----
 6 files changed, 124 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/088156de/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 47f689c..2183d3f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -206,6 +206,9 @@ Release 2.8.0 - UNRELEASED
     yarn.scheduler.capacity.node-locality-delay in code and default xml file.
     (Nijel SF via vinodkv)
 
+    YARN-2331. Distinguish shutdown during supervision vs. shutdown for
+    rolling upgrade. (Jason Lowe via xgong)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088156de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5291ff2..0851f3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1158,6 +1158,10 @@ public class YarnConfiguration extends Configuration {
 
   public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
 
+  public static final String NM_RECOVERY_SUPERVISED =
+      NM_RECOVERY_PREFIX + "supervised";
+  public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;
+
   ////////////////////////////////
   // Web Proxy Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088156de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e1e0ebd..4d74f76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1192,6 +1192,15 @@
     <value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
   </property>
 
+  <property>
+    <description>Whether the nodemanager is running under supervision. A
+      nodemanager that supports recovery and is running under supervision
+      will not try to cleanup containers as it exits with the assumption
+      it will be immediately be restarted and recover containers.</description>
+    <name>yarn.nodemanager.recovery.supervised</name>
+    <value>false</value>
+  </property>
+
   <!--Docker configuration-->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088156de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index c48df64..494fa8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -530,8 +530,11 @@ public class ContainerManagerImpl extends CompositeService implements
 
     if (this.context.getNMStateStore().canRecover()
         && !this.context.getDecommissioned()) {
-      // do not cleanup apps as they can be recovered on restart
-      return;
+      if (getConfig().getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED,
+          YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED)) {
+        // do not cleanup apps as they can be recovered on restart
+        return;
+      }
     }
 
     List<ApplicationId> appIds =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088156de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 0018d56..dbbfcd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -145,10 +145,13 @@ public class LogAggregationService extends AbstractService implements
    
   private void stopAggregators() {
     threadPool.shutdown();
+    boolean supervised = getConfig().getBoolean(
+        YarnConfiguration.NM_RECOVERY_SUPERVISED,
+        YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED);
     // if recovery on restart is supported then leave outstanding aggregations
     // to the next restart
     boolean shouldAbort = context.getNMStateStore().canRecover()
-        && !context.getDecommissioned();
+        && !context.getDecommissioned() && supervised;
     // politely ask to finish
     for (AppLogAggregator aggregator : appLogAggregators.values()) {
       if (shouldAbort) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088156de/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index c45ffbb..781950e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -22,7 +22,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -68,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -82,27 +87,18 @@ public class TestContainerManagerRecovery {
   public void testApplicationRecovery() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
     conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
     NMStateStoreService stateStore = new NMMemoryStateStoreService();
     stateStore.init(conf);
     stateStore.start();
-    Context context = new NMContext(new NMContainerTokenSecretManager(
-        conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore);
+    Context context = createContext(conf, stateStore);
     ContainerManagerImpl cm = createContainerManager(context);
     cm.init(conf);
     cm.start();
 
-    // simulate registration with RM
-    MasterKey masterKey = new MasterKeyPBImpl();
-    masterKey.setKeyId(123);
-    masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
-      .byteValue() }));
-    context.getContainerTokenSecretManager().setMasterKey(masterKey);
-    context.getNMTokenSecretManager().setMasterKey(masterKey);
-
     // add an application by starting a container
     String appUser = "app_user1";
     String modUser = "modify_user1";
@@ -155,9 +151,7 @@ public class TestContainerManagerRecovery {
 
     // reset container manager and verify app recovered with proper acls
     cm.stop();
-    context = new NMContext(new NMContainerTokenSecretManager(
-        conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore);
+    context = createContext(conf, stateStore);
     cm = createContainerManager(context);
     cm.init(conf);
     cm.start();
@@ -201,9 +195,7 @@ public class TestContainerManagerRecovery {
 
     // restart and verify app is marked for finishing
     cm.stop();
-    context = new NMContext(new NMContainerTokenSecretManager(
-        conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore);
+    context = createContext(conf, stateStore);
     cm = createContainerManager(context);
     cm.init(conf);
     cm.start();
@@ -233,9 +225,7 @@ public class TestContainerManagerRecovery {
 
     // restart and verify app is no longer present after recovery
     cm.stop();
-    context = new NMContext(new NMContainerTokenSecretManager(
-        conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore);
+    context = createContext(conf, stateStore);
     cm = createContainerManager(context);
     cm.init(conf);
     cm.start();
@@ -243,6 +233,95 @@ public class TestContainerManagerRecovery {
     cm.stop();
   }
 
+  @Test
+  public void testContainerCleanupOnShutdown() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+    Map<String, LocalResource> localResources = Collections.emptyMap();
+    Map<String, String> containerEnv = Collections.emptyMap();
+    List<String> containerCmds = Collections.emptyList();
+    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+    Credentials containerCreds = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    containerCreds.writeTokenStorageToStream(dob);
+    ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
+        dob.getLength());
+    Map<ApplicationAccessType, String> acls = Collections.emptyMap();
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, containerEnv, containerCmds, serviceData,
+        containerTokens, acls);
+    // create the logAggregationContext
+    LogAggregationContext logAggregationContext =
+        LogAggregationContext.newInstance("includePattern", "excludePattern");
+
+    // verify containers are stopped on shutdown without recovery
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
+    conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
+    Context context = createContext(conf, new NMNullStateStoreService());
+    ContainerManagerImpl cm = spy(createContainerManager(context));
+    cm.init(conf);
+    cm.start();
+    StartContainersResponse startResponse = startContainer(context, cm, cid,
+        clc, logAggregationContext);
+    assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+    cm.stop();
+    verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
+
+    // verify containers are stopped on shutdown with unsupervised recovery
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
+    NMMemoryStateStoreService memStore = new NMMemoryStateStoreService();
+    memStore.init(conf);
+    memStore.start();
+    context = createContext(conf, memStore);
+    cm = spy(createContainerManager(context));
+    cm.init(conf);
+    cm.start();
+    startResponse = startContainer(context, cm, cid,
+        clc, logAggregationContext);
+    assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+    cm.stop();
+    memStore.close();
+    verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
+
+    // verify containers are not stopped on shutdown with supervised recovery
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+    memStore = new NMMemoryStateStoreService();
+    memStore.init(conf);
+    memStore.start();
+    context = createContext(conf, memStore);
+    cm = spy(createContainerManager(context));
+    cm.init(conf);
+    cm.start();
+    startResponse = startContainer(context, cm, cid,
+        clc, logAggregationContext);
+    assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+    cm.stop();
+    memStore.close();
+    verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
+  }
+
+  private NMContext createContext(YarnConfiguration conf,
+      NMStateStoreService stateStore) {
+    NMContext context = new NMContext(new NMContainerTokenSecretManager(
+        conf), new NMTokenSecretManagerInNM(), null,
+        new ApplicationACLsManager(conf), stateStore);
+
+    // simulate registration with RM
+    MasterKey masterKey = new MasterKeyPBImpl();
+    masterKey.setKeyId(123);
+    masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
+      .byteValue() }));
+    context.getContainerTokenSecretManager().setMasterKey(masterKey);
+    context.getNMTokenSecretManager().setMasterKey(masterKey);
+    return context;
+  }
+
   private StartContainersResponse startContainer(Context context,
       final ContainerManagerImpl cm, ContainerId cid,
       ContainerLaunchContext clc, LogAggregationContext logAggregationContext)