You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/03 17:49:42 UTC

hadoop git commit: YARN-1815. Work preserving recovery of Unmanged AMs. Contributed by Subru Krishnan

Repository: hadoop
Updated Branches:
  refs/heads/trunk c58a59f70 -> 097baaaeb


YARN-1815. Work preserving recovery of Unmanged AMs. Contributed by Subru Krishnan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/097baaae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/097baaae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/097baaae

Branch: refs/heads/trunk
Commit: 097baaaebae021c47bb7d69aa1ff1a2440df5166
Parents: c58a59f
Author: Jian He <ji...@apache.org>
Authored: Fri Jun 3 10:49:30 2016 -0700
Committer: Jian He <ji...@apache.org>
Committed: Fri Jun 3 10:49:30 2016 -0700

----------------------------------------------------------------------
 .../rmapp/attempt/RMAppAttemptImpl.java         | 28 +++---
 .../scheduler/AbstractYarnScheduler.java        |  8 --
 .../yarn/server/resourcemanager/MockRM.java     | 14 +++
 .../TestWorkPreservingRMRestart.java            | 92 ++++++++++++++++++++
 .../attempt/TestRMAppAttemptTransitions.java    |  3 +-
 5 files changed, 122 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/097baaae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 1e2a293..75090fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -354,8 +354,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
               RMAppAttemptState.FAILED))
 
        // Transitions from RUNNING State
-      .addTransition(RMAppAttemptState.RUNNING,
-          EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED),
+      .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
@@ -1714,25 +1713,26 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
   }
 
-  private static final class AMUnregisteredTransition implements
-      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+  private static final class AMUnregisteredTransition extends BaseTransition {
 
     @Override
-    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
+    public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
       // Tell the app
       if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
+        // YARN-1815: Saving the attempt final state so that we do not recover
+        // the finished Unmanaged AM post RM failover
         // Unmanaged AMs have no container to wait for, so they skip
         // the FINISHING state and go straight to FINISHED.
-        appAttempt.updateInfoOnAMUnregister(event);
-        new FinalTransition(RMAppAttemptState.FINISHED).transition(
-            appAttempt, event);
-        return RMAppAttemptState.FINISHED;
+        appAttempt.rememberTargetTransitionsAndStoreState(event,
+            new AMFinishedAfterFinalSavingTransition(event),
+            RMAppAttemptState.FINISHED, RMAppAttemptState.FINISHED);
+      } else {
+        // Saving the attempt final state
+        appAttempt.rememberTargetTransitionsAndStoreState(event,
+            new FinalStateSavedAfterAMUnregisterTransition(),
+            RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED);
       }
-      // Saving the attempt final state
-      appAttempt.rememberTargetTransitionsAndStoreState(event,
-        new FinalStateSavedAfterAMUnregisterTransition(),
-        RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED);
       ApplicationId applicationId =
           appAttempt.getAppAttemptId().getApplicationId();
 
@@ -1743,7 +1743,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       // AppAttempt to App after this point of time is AM/AppAttempt Finished.
       appAttempt.eventHandler.handle(new RMAppEvent(applicationId,
         RMAppEventType.ATTEMPT_UNREGISTERED));
-      return RMAppAttemptState.FINAL_SAVING;
+      return;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/097baaae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 8f03de2..354dcb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -354,14 +354,6 @@ public abstract class AbstractYarnScheduler
         continue;
       }
 
-      // Unmanaged AM recovery is addressed in YARN-1815
-      if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
-        LOG.info("Skip recovering container " + container + " for unmanaged AM."
-            + rmApp.getApplicationId());
-        killOrphanContainerOnNode(nm, container);
-        continue;
-      }
-
       SchedulerApplication<T> schedulerApp = applications.get(appId);
       if (schedulerApp == null) {
         LOG.info("Skip recovering container  " + container

http://git-wip-us.apache.org/repos/asf/hadoop/blob/097baaae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 982724e..dc749be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -947,6 +947,20 @@ public class MockRM extends ResourceManager {
     return am;
   }
 
+  public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    // UAMs go directly to LAUNCHED state
+    rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
+    System.out.println("Launch AM " + attempt.getAppAttemptId());
+    nm.nodeHeartbeat(true);
+    MockAM am = new MockAM(rm.getRMContext(), rm.masterService,
+        attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+    return am;
+  }
+
   public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm)
       throws Exception {
     rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/097baaae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 458e8c3..7e5915b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -1434,4 +1434,96 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     // check that attempt state is recovered correctly.
     assertEquals(RMAppAttemptState.FINISHED, recoveredApp1.getCurrentAppAttempt().getState());
   }
+
+  @Test(timeout = 600000)
+  public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the UAM
+    RMApp app0 = rm1.submitApp(200, true);
+    MockAM am0 = MockRM.launchUAM(app0, rm1, nm1);
+    am0.registerAppAttempt();
+
+    // Allocate containers to UAM
+    int numContainers = 2;
+    am0.allocate("127.0.0.1", 1000, numContainers,
+        new ArrayList<ContainerId>());
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am0.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    Assert.assertTrue(conts.isEmpty());
+    while (conts.size() == 0) {
+      nm1.nodeHeartbeat(true);
+      conts.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(500);
+    }
+    Assert.assertFalse(conts.isEmpty());
+
+    // start new RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
+
+    // recover app
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    RMApp recoveredApp =
+        rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+    NMContainerStatus container1 = TestRMRestart
+        .createNMContainerStatus(am0.getApplicationAttemptId(), 1,
+            ContainerState.RUNNING);
+    NMContainerStatus container2 = TestRMRestart
+        .createNMContainerStatus(am0.getApplicationAttemptId(), 2,
+            ContainerState.RUNNING);
+    nm1.registerNode(Arrays.asList(container1, container2), null);
+
+    // Wait for RM to settle down on recovering containers;
+    waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
+
+    // retry registerApplicationMaster() after RM restart.
+    am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    am0.registerAppAttempt(true);
+
+    // Check if UAM is correctly recovered on restart
+    rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+
+    // Check if containers allocated to UAM are recovered
+    Map<ApplicationId, SchedulerApplication> schedulerApps =
+        ((AbstractYarnScheduler) rm2.getResourceScheduler())
+            .getSchedulerApplications();
+    SchedulerApplication schedulerApp =
+        schedulerApps.get(recoveredApp.getApplicationId());
+    SchedulerApplicationAttempt schedulerAttempt =
+        schedulerApp.getCurrentAppAttempt();
+    Assert.assertEquals(numContainers,
+        schedulerAttempt.getLiveContainers().size());
+
+    // Check if UAM is able to heart beat
+    Assert.assertNotNull(am0.doHeartbeat());
+
+    // Complete the UAM
+    am0.unregisterAppAttempt(false);
+    rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
+    rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
+    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+        recoveredApp.getFinalApplicationStatus());
+
+    // Restart RM once more to check UAM is not re-run
+    MockRM rm3 = new MockRM(conf, memStore);
+    rm3.start();
+    recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId());
+    Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/097baaae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 93f17e7..3143b94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -596,8 +596,8 @@ public class TestRMAppAttemptTransitions {
     } else {
       assertEquals(getProxyUrl(applicationAttempt),
           applicationAttempt.getTrackingUrl());
-      verifyAttemptFinalStateSaved();
     }
+    verifyAttemptFinalStateSaved();
     assertEquals(finishedContainerCount, applicationAttempt
         .getJustFinishedContainers().size());
     Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
@@ -735,6 +735,7 @@ public class TestRMAppAttemptTransitions {
     applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
         applicationAttempt.getAppAttemptId(), url, finalStatus,
         diagnostics));
+    sendAttemptUpdateSavedEvent(applicationAttempt);
     testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1,
         true);
     assertFalse(transferStateFromPreviousAttempt);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org