You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/01/28 07:48:38 UTC

[12/50] [abbrv] hadoop git commit: YARN-4497. RM might fail to restart when recovering apps whose attempts are missing. (Jun Gong via rohithsharmaks)

YARN-4497. RM might fail to restart when recovering apps whose attempts are missing. (Jun Gong via rohithsharmaks)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6258b33
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6258b33
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6258b33

Branch: refs/heads/YARN-3926
Commit: d6258b33a7428a0725ead96bc43f4dd444c7c8f1
Parents: 0bae506
Author: rohithsharmaks <ro...@apache.org>
Authored: Fri Jan 22 20:27:38 2016 +0530
Committer: rohithsharmaks <ro...@apache.org>
Committed: Fri Jan 22 20:27:38 2016 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../server/resourcemanager/rmapp/RMAppImpl.java | 26 ++++++++-
 .../rmapp/attempt/RMAppAttemptImpl.java         |  8 +++
 .../server/resourcemanager/TestRMRestart.java   | 61 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6258b33/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2230b42..b667b5b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -143,6 +143,9 @@ Release 2.9.0 - UNRELEASED
     YARN-4578. Directories that are mounted in docker containers need to be more
     restrictive/container-specific. (Sidharta Seethana via vvasudev)
 
+    YARN-4497. RM might fail to restart when recovering apps whose attempts are missing.
+    (Jun Gong via rohithsharmaks)
+
 Release 2.8.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6258b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 1a390df..10c9edc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -849,17 +850,32 @@ public class RMAppImpl implements RMApp, Recoverable {
     // send the ATS create Event
     sendATSCreateEvent(this, this.startTime);
 
-    for(int i=0; i<appState.getAttemptCount(); ++i) {
+    RMAppAttemptImpl preAttempt = null;
+    for (ApplicationAttemptId attemptId :
+        new TreeSet<>(appState.attempts.keySet())) {
       // create attempt
-      createNewAttempt();
+      createNewAttempt(attemptId);
       ((RMAppAttemptImpl)this.currentAttempt).recover(state);
+      // If previous attempt is not in final state, it means we failed to store
+      // its final state. We set it to FAILED now because we could not make sure
+      // about its final state.
+      if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) {
+        preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED);
+      }
+      preAttempt = (RMAppAttemptImpl)currentAttempt;
+    }
+    if (currentAttempt != null) {
+      nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1;
     }
   }
 
   private void createNewAttempt() {
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
+    createNewAttempt(appAttemptId);
+  }
 
+  private void createNewAttempt(ApplicationAttemptId appAttemptId) {
     BlacklistManager currentAMBlacklist;
     if (currentAttempt != null) {
       currentAMBlacklist = currentAttempt.getAMBlacklist();
@@ -1803,4 +1819,10 @@ public class RMAppImpl implements RMApp, Recoverable {
   public float getAmBlacklistingDisableThreshold() {
     return blacklistDisableThreshold;
   }
+
+  @Private
+  @VisibleForTesting
+  public int getNextAttemptId() {
+    return nextAttemptId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6258b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 8aefe9f..3f45cb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -2113,4 +2113,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) {
     this.amLaunchDiagnostics = amLaunchDiagnostics;
   }
+
+  public RMAppAttemptState getRecoveredFinalState() {
+    return recoveredFinalState;
+  }
+
+  public void setRecoveredFinalState(RMAppAttemptState finalState) {
+    this.recoveredFinalState = finalState;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6258b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 625c3b8..3bab88a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreRMDTMa
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -2379,4 +2380,64 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     }
   }
 
+  @Test(timeout = 60000)
+  public void testRMRestartOnMissingAttempts() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    MockRM rm1 = createMockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create an app and finish the app.
+    RMApp app0 = rm1.submitApp(200);
+    ApplicationStateData app0State = memStore.getState().getApplicationState()
+        .get(app0.getApplicationId());
+
+    MockAM am0 = launchAndFailAM(app0, rm1, nm1);
+    MockAM am1 = launchAndFailAM(app0, rm1, nm1);
+    MockAM am2 = launchAndFailAM(app0, rm1, nm1);
+    MockAM am3 = launchAM(app0, rm1, nm1);
+
+    // am1 is missed from MemoryRMStateStore
+    memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId());
+    ApplicationAttemptStateData am2State = app0State.getAttempt(
+        am2.getApplicationAttemptId());
+    // am2's state is not consistent: MemoryRMStateStore just saved its initial
+    // state and failed to store its final state
+    am2State.setState(null);
+
+    // restart rm
+    MockRM rm2 = createMockRM(conf, memStore);
+    rm2.start();
+
+    Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
+    RMApp recoveredApp0 = rm2.getRMContext().getRMApps().values()
+        .iterator().next();
+    Map<ApplicationAttemptId, RMAppAttempt> recoveredAppAttempts
+        = recoveredApp0.getAppAttempts();
+    Assert.assertEquals(3, recoveredAppAttempts.size());
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        recoveredAppAttempts.get(
+            am0.getApplicationAttemptId()).getAppAttemptState());
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        recoveredAppAttempts.get(
+            am2.getApplicationAttemptId()).getAppAttemptState());
+    Assert.assertEquals(RMAppAttemptState.LAUNCHED,
+        recoveredAppAttempts.get(
+            am3.getApplicationAttemptId()).getAppAttemptState());
+    Assert.assertEquals(5, ((RMAppImpl)app0).getNextAttemptId());
+  }
+
+  private MockAM launchAndFailAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    MockAM am = launchAM(app, rm, nm);
+    nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FAILED);
+    return am;
+  }
 }