You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2017/09/26 09:08:26 UTC

hadoop git commit: YARN-65. Reduce RM app memory footprint once app has completed. Contributed by Manikandan R.

Repository: hadoop
Updated Branches:
  refs/heads/trunk d08b8c801 -> 06e5a7b5c


YARN-65. Reduce RM app memory footprint once app has completed. Contributed by Manikandan R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06e5a7b5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06e5a7b5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06e5a7b5

Branch: refs/heads/trunk
Commit: 06e5a7b5cf141420d3a411088b87acba72e68cad
Parents: d08b8c8
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Tue Sep 26 14:33:48 2017 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Tue Sep 26 14:34:09 2017 +0530

----------------------------------------------------------------------
 .../server/resourcemanager/rmapp/RMAppImpl.java |  11 +-
 .../resourcemanager/MockMemoryRMStateStore.java | 130 ++++++++++++
 .../yarn/server/resourcemanager/MockRM.java     |   2 +-
 .../resourcemanager/MockRMMemoryStateStore.java |  32 ---
 .../resourcemanager/TestApplicationCleanup.java |   4 +-
 .../TestContainerResourceUsage.java             |   4 +-
 .../yarn/server/resourcemanager/TestRMHA.java   |   4 +-
 .../server/resourcemanager/TestRMRestart.java   |  47 +++--
 .../TestWorkPreservingRMRestart.java            |  51 +++--
 .../applicationsmanager/TestAMRestart.java      |  13 +-
 .../rmapp/TestRMAppTransitions.java             | 199 +++++++++++++++++--
 .../security/TestRMDelegationTokens.java        |   6 +-
 12 files changed, 414 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 98192ca..09381f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1459,7 +1459,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       app.rmContext.getSystemMetricsPublisher()
           .appFinished(app, finalState, app.finishTime);
       // set the memory free
-      app.submissionContext.getAMContainerSpec().setTokensConf(null);
+      app.clearUnusedFields();
     };
   }
 
@@ -2021,4 +2021,13 @@ public class RMAppImpl implements RMApp, Recoverable {
   public void setApplicationPriority(Priority applicationPriority) {
     this.applicationPriority = applicationPriority;
   }
+
+  /**
+     * Clear Unused fields to free memory.
+     * @param app
+     */
+  private void clearUnusedFields() {
+    this.submissionContext.setAMContainerSpec(null);
+    this.submissionContext.setLogAggregationContext(null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockMemoryRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockMemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockMemoryRMStateStore.java
new file mode 100644
index 0000000..698f1c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockMemoryRMStateStore.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Test helper for MemoryRMStateStore will make sure the event.
+ */
+public class MockMemoryRMStateStore extends MemoryRMStateStore {
+
+  private Map<ApplicationId, ApplicationSubmissionContext> appSubCtxtCopy =
+      new HashMap<ApplicationId, ApplicationSubmissionContext>();
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected EventHandler getRMStateStoreEventHandler() {
+    return rmStateStoreEventHandler;
+  }
+
+  @Override
+  public synchronized RMState loadState() throws Exception {
+
+    RMState cloneState = super.loadState();
+
+    for(Entry<ApplicationId, ApplicationStateData> state :
+        cloneState.getApplicationState().entrySet()) {
+      ApplicationStateData oldStateData = state.getValue();
+      oldStateData.setApplicationSubmissionContext(
+          this.appSubCtxtCopy.get(state.getKey()));
+      cloneState.getApplicationState().put(state.getKey(), oldStateData);
+    }
+    return cloneState;
+  }
+
+  @Override
+  public synchronized void storeApplicationStateInternal(
+      ApplicationId appId, ApplicationStateData appState)
+      throws Exception {
+    // Clone Application Submission Context
+    this.cloneAppSubmissionContext(appState);
+    super.storeApplicationStateInternal(appId, appState);
+  }
+
+  @Override
+  public synchronized void updateApplicationStateInternal(
+      ApplicationId appId, ApplicationStateData appState)
+      throws Exception {
+    // Clone Application Submission Context
+    this.cloneAppSubmissionContext(appState);
+    super.updateApplicationStateInternal(appId, appState);
+  }
+
+  /**
+   * Clone Application Submission Context and Store in Map for
+   * later use.
+   *
+   * @param appState
+   */
+  private void cloneAppSubmissionContext(ApplicationStateData appState) {
+    ApplicationSubmissionContext oldAppSubCtxt =
+        appState.getApplicationSubmissionContext();
+    ApplicationSubmissionContext context =
+        ApplicationSubmissionContext.newInstance(
+            oldAppSubCtxt.getApplicationId(),
+            oldAppSubCtxt.getApplicationName(),
+            oldAppSubCtxt.getQueue(),
+            oldAppSubCtxt.getPriority(),
+            oldAppSubCtxt.getAMContainerSpec(),
+            oldAppSubCtxt.getUnmanagedAM(),
+            oldAppSubCtxt.getCancelTokensWhenComplete(),
+            oldAppSubCtxt.getMaxAppAttempts(),
+            oldAppSubCtxt.getResource()
+            );
+    context.setAttemptFailuresValidityInterval(
+        oldAppSubCtxt.getAttemptFailuresValidityInterval());
+    context.setKeepContainersAcrossApplicationAttempts(
+        oldAppSubCtxt.getKeepContainersAcrossApplicationAttempts());
+    context.setAMContainerResourceRequests(
+        oldAppSubCtxt.getAMContainerResourceRequests());
+    context.setLogAggregationContext(oldAppSubCtxt.getLogAggregationContext());
+    context.setApplicationType(oldAppSubCtxt.getApplicationType());
+    this.appSubCtxtCopy.put(oldAppSubCtxt.getApplicationId(), context);
+  }
+
+  /**
+   * Traverse each app state and replace cloned app sub context
+   * into the state.
+   *
+   * @param actualState
+   * @return actualState
+   */
+  @VisibleForTesting
+  public RMState reloadStateWithClonedAppSubCtxt(RMState actualState) {
+    for(Entry<ApplicationId, ApplicationStateData> state :
+        actualState.getApplicationState().entrySet()) {
+      ApplicationStateData oldStateData = state.getValue();
+      oldStateData.setApplicationSubmissionContext(
+          this.appSubCtxtCopy.get(state.getKey()));
+      actualState.getApplicationState().put(state.getKey(),
+          oldStateData);
+    }
+    return actualState;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index b772e80..973f487 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -164,7 +164,7 @@ public class MockRM extends ResourceManager {
     } else {
       Class storeClass = getRMContext().getStateStore().getClass();
       if (storeClass.equals(MemoryRMStateStore.class)) {
-        MockRMMemoryStateStore mockStateStore = new MockRMMemoryStateStore();
+        MockMemoryRMStateStore mockStateStore = new MockMemoryRMStateStore();
         mockStateStore.init(conf);
         setRMStateStore(mockStateStore);
       } else if (storeClass.equals(NullRMStateStore.class)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMMemoryStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMMemoryStateStore.java
deleted file mode 100644
index d88ee1e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRMMemoryStateStore.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
-
-/**
- * Test helper for MemoryRMStateStore will make sure the event.
- */
-public class MockRMMemoryStateStore extends MemoryRMStateStore {
-  @SuppressWarnings("rawtypes")
-  @Override
-  protected EventHandler getRMStateStoreEventHandler() {
-    return rmStateStoreEventHandler;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index ebca7a3..c12ae33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -293,6 +293,8 @@ public class TestApplicationCleanup {
     // start RM
     MockRM rm1 = new MockRM(conf);
     rm1.start();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
@@ -304,7 +306,7 @@ public class TestApplicationCleanup {
     rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
 
     // start new RM
-    MockRM rm2 = new MockRM(conf, rm1.getRMStateStore());
+    MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
     
     // nm1 register to rm2, and do a heartbeat

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java
index 9ed4978..3508ab4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestContainerResourceUsage.java
@@ -143,6 +143,8 @@ public class TestContainerResourceUsage {
     conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
     MockRM rm0 = new MockRM(conf);
     rm0.start();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm0.getRMStateStore();
     MockNM nm =
         new MockNM("127.0.0.1:1234", 65536, rm0.getResourceTrackerService());
     nm.registerNode();
@@ -229,7 +231,7 @@ public class TestContainerResourceUsage {
         vcoreSeconds, metricsBefore.getVcoreSeconds());
 
     // create new RM to represent RM restart. Load up the state store.
-    MockRM rm1 = new MockRM(conf, rm0.getRMStateStore());
+    MockRM rm1 = new MockRM(conf, memStore);
     rm1.start();
     RMApp app0After =
         rm1.getRMContext().getRMApps().get(app0.getApplicationId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index b5293a5..20e9ff4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -415,7 +415,7 @@ public class TestRMHA {
     configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     Configuration conf = new YarnConfiguration(configuration);
 
-    MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
+    MemoryRMStateStore memStore = new MockMemoryRMStateStore() {
       int count = 0;
 
       @Override
@@ -465,7 +465,7 @@ public class TestRMHA {
     configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     Configuration conf = new YarnConfiguration(configuration);
 
-    MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
+    MemoryRMStateStore memStore = new MockMemoryRMStateStore() {
       @Override
       public void updateApplicationState(ApplicationStateData appState) {
         notifyStoreOperationFailed(new StoreFencedException());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 0346f4f..f0f51f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -199,7 +199,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
 
     // PHASE 1: create RM and get state
     MockRM rm1 = createMockRM(conf);
-    MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     Map<ApplicationId, ApplicationStateData> rmAppState =
         memStore.getState().getApplicationState();
 
@@ -679,7 +680,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
   @Test (timeout = 60000)
   public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-    MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
+    MemoryRMStateStore memStore = new MockMemoryRMStateStore() {
       int count = 0;
 
       @Override
@@ -734,7 +735,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
     // create RM
     MockRM rm1 = createMockRM(conf);
-    MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     Map<ApplicationId, ApplicationStateData> rmAppState =
         memStore.getState().getApplicationState();
     // start RM
@@ -780,7 +782,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
     // create RM
     MockRM rm1 = createMockRM(conf);
-    MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     Map<ApplicationId, ApplicationStateData> rmAppState =
         memStore.getState().getApplicationState();
     // start RM
@@ -824,18 +827,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
 
   @Test (timeout = 60000)
   public void testRMRestartKilledAppWithNoAttempts() throws Exception {
-    MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
+    MockMemoryRMStateStore memStore = new MockMemoryRMStateStore() {
       @Override
       public synchronized void storeApplicationAttemptStateInternal(
-          ApplicationAttemptId attemptId,
-          ApplicationAttemptStateData attemptStateData) throws Exception {
+          ApplicationAttemptId appAttemptId,
+          ApplicationAttemptStateData attemptState) throws Exception {
         // ignore attempt saving request.
       }
 
       @Override
       public synchronized void updateApplicationAttemptStateInternal(
-          ApplicationAttemptId attemptId,
-          ApplicationAttemptStateData attemptStateData) throws Exception {
+          ApplicationAttemptId appAttemptId,
+          ApplicationAttemptStateData attemptState) throws Exception {
         // ignore attempt saving request.
       }
     };
@@ -868,7 +871,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
     // PHASE 1: create RM and get state
     MockRM rm1 = createMockRM(conf);
-    MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     Map<ApplicationId, ApplicationStateData> rmAppState =
         memStore.getState().getApplicationState();
 
@@ -926,6 +930,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
 
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
+
     // a succeeded app.
     RMApp app0 = rm1.submitApp(200, "name", "user", null,
       false, "default", 1, null, "myType");
@@ -953,7 +960,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     .appCreated(any(RMApp.class), anyLong());
     // restart rm
 
-    MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) {
+    MockRM rm2 = new MockRM(conf, memStore) {
       @Override
       protected RMAppManager createRMAppManager() {
         return spy(super.createRMAppManager());
@@ -1625,7 +1632,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     // start RM
     MockRM rm1 = createMockRM(conf);
     rm1.start();
-    MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     RMState rmState = memStore.getState();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@@ -1664,7 +1672,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
   // This is to test RM does not get hang on shutdown.
   @Test (timeout = 10000)
   public void testRMShutdown() throws Exception {
-    MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
+    MemoryRMStateStore memStore = new MockMemoryRMStateStore() {
       @Override
       public synchronized void checkVersion()
           throws Exception {
@@ -1743,7 +1751,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       }
     };
     rm1.start();
-    MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     RMApp app1 = null;
     try {
        app1 = rm1.submitApp(200, "name", "user",
@@ -1767,7 +1776,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
 
   @Test (timeout = 20000)
   public void testAppRecoveredInOrderOnRMRestart() throws Exception {
-    MemoryRMStateStore memStore = new MockRMMemoryStateStore();
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
     memStore.init(conf);
 
     for (int i = 10; i > 0; i--) {
@@ -2405,6 +2414,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     MockRM rm1 = new MockRM(conf);
     rm1.start();
     CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
 
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@@ -2441,7 +2452,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     MockRM rm2 = null;
     // start RM2
     try {
-      rm2 = new MockRM(conf, rm1.getRMStateStore());
+      rm2 = new MockRM(conf, memStore);
       rm2.start();
       Assert.assertTrue("RM start successfully", true);
     } catch (Exception e) {
@@ -2542,6 +2553,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       }
     };
     rm1.start();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
 
     // add node label "x" and set node to label mapping
     Set<String> clusterNodeLabels = new HashSet<String>();
@@ -2568,7 +2581,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
     MockRM rm2 = new MockRM(
         TestUtils.getConfigurationWithDefaultQueueLabels(conf),
-        rm1.getRMStateStore()) {
+        memStore) {
       @Override
       protected RMNodeLabelsManager createNodeLabelManager() {
         RMNodeLabelsManager mgr = new RMNodeLabelsManager();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index a13cae7..c1cb4c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -572,6 +572,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, "");
     rm1 = new MockRM(conf);
     rm1.start();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
     nm1.registerNode();
@@ -583,7 +585,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUE_DOESNT_EXIST});
     final String noQueue = CapacitySchedulerConfiguration.ROOT + "." + QUEUE_DOESNT_EXIST;
     csConf.setCapacity(noQueue, 100);
-    rm2 = new MockRM(csConf, rm1.getRMStateStore());
+    rm2 = new MockRM(csConf, memStore);
 
     rm2.start();
     UserGroupInformation user2 = UserGroupInformation.createRemoteUser("user2");
@@ -721,11 +723,15 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
 
   private void verifyAppRecoveryWithWrongQueueConfig(
       CapacitySchedulerConfiguration csConf, RMApp app, String diagnostics,
-      MemoryRMStateStore memStore, RMState state) throws Exception {
+      MockMemoryRMStateStore memStore, RMState state) throws Exception {
     // Restart RM with fail-fast as false. App should be killed.
     csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, false);
     rm2 = new MockRM(csConf, memStore);
     rm2.start();
+
+    MockMemoryRMStateStore memStore2 =
+        (MockMemoryRMStateStore) rm2.getRMStateStore();
+
     // Wait for app to be killed.
     rm2.waitForState(app.getApplicationId(), RMAppState.KILLED);
     ApplicationReport report = rm2.getApplicationReport(app.getApplicationId());
@@ -734,24 +740,27 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     assertEquals(report.getYarnApplicationState(), YarnApplicationState.KILLED);
     assertEquals(report.getDiagnostics(), diagnostics);
 
+    //Reload previous state with cloned app sub context object
+    RMState newState = memStore2.reloadStateWithClonedAppSubCtxt(state);
+
     // Remove updated app info(app being KILLED) from state store and reinstate
     // state store to previous state i.e. which indicates app is RUNNING.
     // This is to simulate app recovery with fail fast config as true.
     for(Map.Entry<ApplicationId, ApplicationStateData> entry :
-        state.getApplicationState().entrySet()) {
+        newState.getApplicationState().entrySet()) {
       ApplicationStateData appState = mock(ApplicationStateData.class);
       ApplicationSubmissionContext ctxt =
           mock(ApplicationSubmissionContext.class);
       when(appState.getApplicationSubmissionContext()).thenReturn(ctxt);
       when(ctxt.getApplicationId()).thenReturn(entry.getKey());
-      memStore.removeApplicationStateInternal(appState);
-      memStore.storeApplicationStateInternal(
+      memStore2.removeApplicationStateInternal(appState);
+      memStore2.storeApplicationStateInternal(
           entry.getKey(), entry.getValue());
     }
 
     // Now restart RM with fail-fast as true. QueueException should be thrown.
     csConf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true);
-    MockRM rm = new MockRM(csConf, memStore);
+    MockRM rm = new MockRM(csConf, memStore2);
     try {
       rm.start();
       Assert.fail("QueueException must have been thrown");
@@ -781,6 +790,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     setupQueueConfiguration(csConf);
     rm1 = new MockRM(csConf);
     rm1.start();
+
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     MockNM nm =
         new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
     nm.registerNode();
@@ -801,7 +813,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     String diags = "Application killed on recovery as it was submitted to " +
         "queue QueueB which is no longer a leaf queue after restart.";
     verifyAppRecoveryWithWrongQueueConfig(csConf, app, diags,
-        (MemoryRMStateStore) rm1.getRMStateStore(), state);
+        memStore, state);
   }
 
   //Test behavior of an app if queue is removed during recovery. Test case does
@@ -826,6 +838,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     setupQueueConfiguration(csConf);
     rm1 = new MockRM(csConf);
     rm1.start();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
     MockNM nm2 =
@@ -853,7 +867,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     rm1.clearQueueMetrics(app2);
 
     // Take a copy of state store so that it can be reset to this state.
-    RMState state = rm1.getRMStateStore().loadState();
+    RMState state = memStore.loadState();
 
     // Set new configuration with QueueB removed.
     csConf = new CapacitySchedulerConfiguration(conf);
@@ -862,7 +876,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     String diags = "Application killed on recovery as it was submitted to " +
         "queue QueueB which no longer exists after restart.";
     verifyAppRecoveryWithWrongQueueConfig(csConf, app2, diags,
-        (MemoryRMStateStore) rm1.getRMStateStore(), state);
+        memStore, state);
   }
 
   private void checkParentQueue(ParentQueue parentQueue, int numContainers,
@@ -931,6 +945,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
   public void testContainersNotRecoveredForCompletedApps() throws Exception {
     rm1 = new MockRM(conf);
     rm1.start();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
     nm1.registerNode();
@@ -938,7 +954,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
     MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
 
-    rm2 = new MockRM(conf, rm1.getRMStateStore());
+    rm2 = new MockRM(conf, memStore);
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     NMContainerStatus runningContainer =
@@ -1212,6 +1228,9 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     // start RM
     rm1 = new MockRM(conf);
     rm1.start();
+
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
@@ -1230,7 +1249,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
 
 
     // start new RM
-    rm2 = new MockRM(conf, rm1.getRMStateStore());
+    rm2 = new MockRM(conf, memStore);
     rm2.start();
 
     am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
@@ -1370,7 +1389,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
   // RM should start correctly.
   @Test (timeout = 20000)
   public void testAppStateSavedButAttemptStateNotSaved() throws Exception {
-    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+    MockMemoryRMStateStore memStore = new MockMemoryRMStateStore() {
       @Override public synchronized void updateApplicationAttemptStateInternal(
           ApplicationAttemptId appAttemptId,
           ApplicationAttemptStateData attemptState) {
@@ -1414,6 +1433,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     // start RM
     rm1 = new MockRM(conf);
     rm1.start();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
@@ -1438,8 +1459,10 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     }
 
     // start new RM
-    rm2 = new MockRM(conf, rm1.getRMStateStore());
+    rm2 = new MockRM(conf, memStore);
     rm2.start();
+    MockMemoryRMStateStore memStore2 =
+        (MockMemoryRMStateStore) rm2.getRMStateStore();
     rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
     rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
 
@@ -1488,7 +1511,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
         recoveredApp.getFinalApplicationStatus());
 
     // Restart RM once more to check UAM is not re-run
-    MockRM rm3 = new MockRM(conf, rm1.getRMStateStore());
+    MockRM rm3 = new MockRM(conf, memStore2);
     rm3.start();
     recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId());
     Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 528afac..9d0d879 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -702,8 +703,11 @@ public class TestAMRestart {
     // explicitly set max-am-retry count as 2.
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
     MockRM rm1 = new MockRM(conf);
-    MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
     rm1.start();
+
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
+
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
     nm1.registerNode();
@@ -735,7 +739,6 @@ public class TestAMRestart {
     RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false);
     app1.setSystemClock(clock);
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
     // Fail attempt1 normally
     nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
       1, ContainerState.COMPLETE);
@@ -771,8 +774,12 @@ public class TestAMRestart {
     @SuppressWarnings("resource")
     MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
+
+    MockMemoryRMStateStore memStore1 =
+        (MockMemoryRMStateStore) rm2.getRMStateStore();
     ApplicationStateData app1State =
-        memStore.getState().getApplicationState().get(app1.getApplicationId());
+        memStore1.getState().getApplicationState().
+        get(app1.getApplicationId());
     Assert.assertEquals(1, app1State.getFirstAttemptId());
 
     // re-register the NM

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 7c54b60..344f8bb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -28,17 +29,23 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.Credentials;
@@ -46,14 +53,22 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -251,7 +266,113 @@ public class TestRMAppTransitions {
     rmDispatcher.start();
   }
 
-  protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) {
+  private ByteBuffer getTokens() throws IOException {
+    Credentials ts = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ts.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    return securityTokens;
+  }
+
+  private ByteBuffer getTokensConf() throws IOException {
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    Configuration appConf = new Configuration(false);
+    appConf.clear();
+    appConf.set("dfs.nameservices", "mycluster1,mycluster2");
+    appConf.set("dfs.namenode.rpc-address.mycluster2.nn1",
+        "123.0.0.1");
+    appConf.set("dfs.namenode.rpc-address.mycluster3.nn2",
+        "123.0.0.2");
+    appConf.write(dob);
+    ByteBuffer tokenConf =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    return tokenConf;
+  }
+
+  private Map<String, LocalResource> getLocalResources()
+      throws UnsupportedFileSystemException {
+    FileContext localFS = FileContext.getLocalFSFileContext();
+    File tmpDir = new File("target");
+    File scriptFile = new File(tmpDir, "scriptFile.sh");
+    URL resourceURL =
+        URL.fromPath(localFS
+            .makeQualified(new Path(scriptFile.getAbsolutePath())));
+    LocalResource localRes =
+        Records.newRecord(LocalResource.class);
+    localRes.setResource(resourceURL);
+    localRes.setSize(-1);
+    localRes.setVisibility(LocalResourceVisibility.APPLICATION);
+    localRes.setType(LocalResourceType.FILE);
+    localRes.setTimestamp(scriptFile.lastModified());
+    String destinationFile = "dest_file";
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put(destinationFile, localRes);
+    return localResources;
+  }
+
+  private Map<String, String> getEnvironment() {
+    Map<String, String> userSetEnv = new HashMap<String, String>();
+    userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
+    userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
+    userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
+    userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
+    userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
+    userSetEnv.put(Environment.USER.key(), "user_set_" +
+        Environment.USER.key());
+    userSetEnv.put(Environment.LOGNAME.name(), "user_set_LOGNAME");
+    userSetEnv.put(Environment.PWD.name(), "user_set_PWD");
+    userSetEnv.put(Environment.HOME.name(), "user_set_HOME");
+    return userSetEnv;
+  }
+
+  private ContainerRetryContext getContainerRetryContext() {
+    ContainerRetryContext containerRetryContext = ContainerRetryContext
+        .newInstance(
+            ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
+            new HashSet<>(Arrays.asList(Integer.valueOf(111))), 0, 0);
+    return containerRetryContext;
+  }
+
+  private Map<String, ByteBuffer> getServiceData() {
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    String serviceName = "non_exist_auxService";
+    serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes()));
+    return serviceData;
+  }
+
+  private ContainerLaunchContext prepareContainerLaunchContext()
+      throws IOException {
+
+    ContainerLaunchContext clc =
+        Records.newRecord(ContainerLaunchContext.class);
+    clc.setCommands(Arrays.asList("/bin/sleep 5"));
+    if (UserGroupInformation.isSecurityEnabled()) {
+      clc.setTokens(getTokens());
+      clc.setTokensConf(getTokensConf());
+    }
+    clc.setLocalResources(getLocalResources());
+    clc.setEnvironment(getEnvironment());
+    clc.setContainerRetryContext(getContainerRetryContext());
+    clc.setServiceData(getServiceData());
+    return clc;
+  }
+
+  private LogAggregationContext getLogAggregationContext() {
+    LogAggregationContext logAggregationContext =
+        LogAggregationContext.newInstance(
+          "includePattern", "excludePattern",
+          "rolledLogsIncludePattern",
+          "rolledLogsExcludePattern",
+          "policyClass",
+          "policyParameters");
+    return logAggregationContext;
+  }
+
+  protected RMApp createNewTestApp(ApplicationSubmissionContext
+      submissionContext) throws IOException {
     ApplicationId applicationId = MockApps.newAppID(appId++);
     String user = MockApps.newUserName();
     String name = MockApps.newAppName();
@@ -270,7 +391,9 @@ public class TestRMAppTransitions {
     // but applicationId is still set for safety
     submissionContext.setApplicationId(applicationId);
     submissionContext.setPriority(Priority.newInstance(0));
-    submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
+    submissionContext.setAMContainerSpec(prepareContainerLaunchContext());
+    submissionContext.setLogAggregationContext(getLogAggregationContext());
+
     RMApp application = new RMAppImpl(applicationId, rmContext, conf, name,
         user, queue, submissionContext, scheduler, masterService,
         System.currentTimeMillis(), "YARN", null,
@@ -405,6 +528,7 @@ public class TestRMAppTransitions {
     // verify sendATSCreateEvent() is get called during
     // AddApplicationToSchedulerTransition.
     verify(publisher).appCreated(eq(application), anyLong());
+    verifyRMAppFieldsForNonFinalTransitions(application);
     return application;
   }
 
@@ -422,6 +546,7 @@ public class TestRMAppTransitions {
     application.handle(event);
     assertStartTimeSet(application);
     assertAppState(RMAppState.SUBMITTED, application);
+    verifyRMAppFieldsForNonFinalTransitions(application);
     return application;
   }
 
@@ -530,6 +655,7 @@ public class TestRMAppTransitions {
     assertFailed(application,
         ".*Unmanaged application.*Failing the application.*");
     assertAppFinalStateSaved(application);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
   
   @Test
@@ -539,6 +665,7 @@ public class TestRMAppTransitions {
     RMApp application = testCreateAppFinished(null, diagMsg);
     Assert.assertTrue("Finished application missing diagnostics",
         application.getDiagnostics().indexOf(diagMsg) != -1);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test (timeout = 30000)
@@ -546,15 +673,7 @@ public class TestRMAppTransitions {
     LOG.info("--- START: testAppRecoverPath ---");
     ApplicationSubmissionContext sub =
         Records.newRecord(ApplicationSubmissionContext.class);
-    ContainerLaunchContext clc =
-        Records.newRecord(ContainerLaunchContext.class);
-    Credentials credentials = new Credentials();
-    DataOutputBuffer dob = new DataOutputBuffer();
-    credentials.writeTokenStorageToStream(dob);
-    ByteBuffer securityTokens =
-        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    clc.setTokens(securityTokens);
-    sub.setAMContainerSpec(clc);
+    sub.setAMContainerSpec(prepareContainerLaunchContext());
     testCreateAppSubmittedRecovery(sub);
   }
 
@@ -577,6 +696,7 @@ public class TestRMAppTransitions {
     assertAppFinalStateNotSaved(application);
     verifyApplicationFinished(RMAppState.KILLED);
     verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test
@@ -594,6 +714,7 @@ public class TestRMAppTransitions {
     assertFailed(application, rejectedText);
     assertAppFinalStateSaved(application);
     verifyApplicationFinished(RMAppState.FAILED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test (timeout = 30000)
@@ -611,6 +732,7 @@ public class TestRMAppTransitions {
     assertFailed(application, rejectedText);
     assertAppFinalStateSaved(application);
     verifyApplicationFinished(RMAppState.FAILED);
+    verifyRMAppFieldsForFinalTransitions(application);
     rmContext.getStateStore().removeApplication(application);
   }
 
@@ -633,6 +755,7 @@ public class TestRMAppTransitions {
     assertKilled(application);
     verifyApplicationFinished(RMAppState.KILLED);
     verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test (timeout = 30000)
@@ -650,6 +773,7 @@ public class TestRMAppTransitions {
     assertFailed(application, rejectedText);
     assertAppFinalStateSaved(application);
     verifyApplicationFinished(RMAppState.FAILED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test (timeout = 30000)
@@ -684,6 +808,7 @@ public class TestRMAppTransitions {
     assertFailed(application, rejectedText);
     assertAppFinalStateSaved(application);
     verifyApplicationFinished(RMAppState.FAILED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test
@@ -706,6 +831,7 @@ public class TestRMAppTransitions {
     assertAppFinalStateSaved(application);
     verifyApplicationFinished(RMAppState.KILLED);
     verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test
@@ -769,8 +895,9 @@ public class TestRMAppTransitions {
     assertAppFinalStateSaved(application);
     verifyApplicationFinished(RMAppState.KILLED);
     verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
-  
+
   @Test
   public void testAppAcceptedAttemptKilled() throws IOException,
       InterruptedException {
@@ -816,6 +943,7 @@ public class TestRMAppTransitions {
     assertKilled(application);
     verifyApplicationFinished(RMAppState.KILLED);
     verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test
@@ -873,6 +1001,7 @@ public class TestRMAppTransitions {
     assertFailed(application, ".*Failing the application.*");
     assertAppFinalStateSaved(application);
     verifyApplicationFinished(RMAppState.FAILED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test
@@ -914,6 +1043,7 @@ public class TestRMAppTransitions {
     assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
     Assert.assertTrue("Finished app missing diagnostics", application
       .getDiagnostics().indexOf(diagMsg) != -1);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test
@@ -933,6 +1063,7 @@ public class TestRMAppTransitions {
     Assert.assertEquals("application diagnostics is not correct",
         "", diag.toString());
     verifyApplicationFinished(RMAppState.FINISHED);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test (timeout = 30000)
@@ -962,6 +1093,7 @@ public class TestRMAppTransitions {
 
     assertTimesAtFinish(application);
     assertAppState(RMAppState.FAILED, application);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
 
   @Test (timeout = 30000)
@@ -1016,6 +1148,7 @@ public class TestRMAppTransitions {
 
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
+    verifyRMAppFieldsForFinalTransitions(application);
   }
   
   @Test(timeout = 30000)
@@ -1061,11 +1194,12 @@ public class TestRMAppTransitions {
     RMAppState finalState = appState.getState();
     Assert.assertEquals("Application is not in finalState.", finalState,
         application.getState());
+    verifyRMAppFieldsForFinalTransitions(application);
   }
   
   public void createRMStateForApplications(
       Map<ApplicationId, ApplicationStateData> applicationState,
-      RMAppState rmAppState) {
+      RMAppState rmAppState) throws IOException {
     RMApp app = createNewTestApp(null);
     ApplicationStateData appState =
         ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(),
@@ -1075,7 +1209,7 @@ public class TestRMAppTransitions {
   }
   
   @Test
-  public void testGetAppReport() {
+  public void testGetAppReport() throws IOException {
     RMApp app = createNewTestApp(null);
     assertAppState(RMAppState.NEW, app);
     ApplicationReport report = app.createAndGetApplicationReport(null, true);
@@ -1109,4 +1243,41 @@ public class TestRMAppTransitions {
       Assert.assertEquals(finalState, appRemovedEvent.getFinalState());
     }
   }
+
+  private void verifyRMAppFieldsForNonFinalTransitions(RMApp application)
+      throws IOException {
+    assertEquals(Arrays.asList("/bin/sleep 5"),
+        application.getApplicationSubmissionContext().
+        getAMContainerSpec().getCommands());
+    assertEquals(getLocalResources(),
+        application.getApplicationSubmissionContext().
+        getAMContainerSpec().getLocalResources());
+    if(UserGroupInformation.isSecurityEnabled()) {
+      assertEquals(getTokens(),
+          application.getApplicationSubmissionContext().
+          getAMContainerSpec().getTokens());
+      assertEquals(getTokensConf(),
+          application.getApplicationSubmissionContext().
+          getAMContainerSpec().getTokensConf());
+    }
+    assertEquals(getEnvironment(),
+        application.getApplicationSubmissionContext().
+        getAMContainerSpec().getEnvironment());
+    assertEquals(getContainerRetryContext(),
+        application.getApplicationSubmissionContext().
+        getAMContainerSpec().getContainerRetryContext());
+    assertEquals(getServiceData(),
+        application.getApplicationSubmissionContext().
+        getAMContainerSpec().getServiceData());
+    assertEquals(getLogAggregationContext(),
+        application.getApplicationSubmissionContext().
+        getLogAggregationContext());
+  }
+
+  private void verifyRMAppFieldsForFinalTransitions(RMApp application) {
+    assertEquals(null, application.getApplicationSubmissionContext().
+        getAMContainerSpec());
+    assertEquals(null, application.getApplicationSubmissionContext().
+        getLogAggregationContext());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/06e5a7b5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
index 640293c..2c52377 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRMMemoryStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
 import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
@@ -78,7 +78,7 @@ public class TestRMDelegationTokens {
     UserGroupInformation.getLoginUser()
         .setAuthenticationMethod(AuthenticationMethod.KERBEROS);
 
-    MemoryRMStateStore memStore = new MockRMMemoryStateStore();
+    MemoryRMStateStore memStore = new MockMemoryRMStateStore();
     memStore.init(conf);
     RMState rmState = memStore.getState();
 
@@ -132,7 +132,7 @@ public class TestRMDelegationTokens {
   // Test all expired keys are removed from state-store.
   @Test(timeout = 15000)
   public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
-    MemoryRMStateStore memStore = new MockRMMemoryStateStore();
+    MemoryRMStateStore memStore = new MockMemoryRMStateStore();
     memStore.init(testConf);
     RMState rmState = memStore.getState();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org