You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/04/24 21:14:04 UTC

hadoop git commit: YARN-3387. Previous AM's container completed status couldn't pass to current AM if AM and RM restarted during the same time. Contributed by Sandflee

Repository: hadoop
Updated Branches:
  refs/heads/trunk c7d9ad68e -> d03dcb963


YARN-3387. Previous AM's container completed status couldn't pass to current AM if AM and RM restarted during the same time. Contributed by Sandflee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d03dcb96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d03dcb96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d03dcb96

Branch: refs/heads/trunk
Commit: d03dcb9635dbd79a45d229d1cab5fd28e5e49f49
Parents: c7d9ad6
Author: Jian He <ji...@apache.org>
Authored: Fri Apr 24 12:12:28 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Fri Apr 24 12:13:29 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../server/resourcemanager/rmapp/RMAppImpl.java |  2 +-
 .../rmapp/attempt/RMAppAttemptImpl.java         |  9 ++-
 .../TestWorkPreservingRMRestart.java            | 60 ++++++++++++++++++++
 4 files changed, 72 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d03dcb96/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3311a2e..19e3e27 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -262,6 +262,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3516. killing ContainerLocalizer action doesn't take effect when
     private localizer receives FETCH_FAILURE status.(zhihai xu via xgong)
 
+    YARN-3387. Previous AM's container completed status couldn't pass to current
+    AM if AM and RM restarted during the same time. (sandflee via jianhe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d03dcb96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index b4e4965..8abc478 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1273,7 +1273,7 @@ public class RMAppImpl implements RMApp, Recoverable {
         // finished containers so that they can be acked to NM,
         // but when pulling finished container we will check this flag again.
         ((RMAppAttemptImpl) app.currentAttempt)
-          .transferStateFromPreviousAttempt(oldAttempt);
+          .transferStateFromAttempt(oldAttempt);
         return initialState;
       } else {
         if (numberOfFailure >= app.maxAppAttempts) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d03dcb96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 913d06b..8abc65a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -845,7 +845,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
   }
 
-  public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
+  public void transferStateFromAttempt(RMAppAttempt attempt) {
     this.justFinishedContainers = attempt.getJustFinishedContainersReference();
     this.finishedContainersSentToAM =
         attempt.getFinishedContainersSentToAMReference();
@@ -1044,6 +1044,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         appAttempt.progress = 1.0f;
         RMApp rmApp =appAttempt.rmContext.getRMApps().get(
             appAttempt.getAppAttemptId().getApplicationId());
+
+        if (appAttempt.submissionContext
+            .getKeepContainersAcrossApplicationAttempts()
+            && !appAttempt.submissionContext.getUnmanagedAM()
+            && rmApp.getCurrentAppAttempt() != appAttempt) {
+          appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
+        }
         // We will replay the final attempt only if last attempt is in final
         // state but application is not in final state.
         if (rmApp.getCurrentAppAttempt() == appAttempt

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d03dcb96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 0566f3d..c6fe371 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -1037,4 +1037,64 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     rm2.start();
   }
+
+  @Test(timeout = 20000)
+  public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // submit app with keepContainersAcrossApplicationAttempts true
+    RMApp app0 = rm1.submitApp(200, "", UserGroupInformation.getCurrentUser()
+        .getShortUserName(), null, false, null, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, 
+        null, null, true, true, false, null, 0, null, true);
+    MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    am0.allocate("127.0.0.1", 1000, 2, new ArrayList<ContainerId>());
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am0.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (conts.size() == 0) {
+      nm1.nodeHeartbeat(true);
+      conts.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(500);
+    }
+
+    // am failed,and relaunch it
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am1 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    // rm failover
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    // container launched by first am completed
+    NMContainerStatus amContainer =
+        TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer=
+        TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 2,
+          ContainerState.COMPLETE);
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 3,
+          ContainerState.RUNNING);
+    nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+        completedContainer), null);
+    Thread.sleep(200);
+
+    // check whether current am could get containerCompleteMsg
+    RMApp recoveredApp0 =
+        rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+    RMAppAttempt loadedAttempt1 = recoveredApp0.getCurrentAppAttempt();
+    assertEquals(1,loadedAttempt1.getJustFinishedContainers().size());
+  }
+
 }