You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/01/28 07:48:38 UTC
[12/50] [abbrv] hadoop git commit: YARN-4497. RM might fail to
restart when recovering apps whose attempts are missing. (Jun Gong via
rohithsharmaks)
YARN-4497. RM might fail to restart when recovering apps whose attempts are missing. (Jun Gong via rohithsharmaks)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6258b33
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6258b33
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6258b33
Branch: refs/heads/YARN-3926
Commit: d6258b33a7428a0725ead96bc43f4dd444c7c8f1
Parents: 0bae506
Author: rohithsharmaks <ro...@apache.org>
Authored: Fri Jan 22 20:27:38 2016 +0530
Committer: rohithsharmaks <ro...@apache.org>
Committed: Fri Jan 22 20:27:38 2016 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/resourcemanager/rmapp/RMAppImpl.java | 26 ++++++++-
.../rmapp/attempt/RMAppAttemptImpl.java | 8 +++
.../server/resourcemanager/TestRMRestart.java | 61 ++++++++++++++++++++
4 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6258b33/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2230b42..b667b5b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -143,6 +143,9 @@ Release 2.9.0 - UNRELEASED
YARN-4578. Directories that are mounted in docker containers need to be more
restrictive/container-specific. (Sidharta Seethana via vvasudev)
+ YARN-4497. RM might fail to restart when recovering apps whose attempts are missing.
+ (Jun Gong via rohithsharmaks)
+
Release 2.8.0 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6258b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 1a390df..10c9edc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -849,17 +850,32 @@ public class RMAppImpl implements RMApp, Recoverable {
// send the ATS create Event
sendATSCreateEvent(this, this.startTime);
- for(int i=0; i<appState.getAttemptCount(); ++i) {
+ RMAppAttemptImpl preAttempt = null;
+ for (ApplicationAttemptId attemptId :
+ new TreeSet<>(appState.attempts.keySet())) {
// create attempt
- createNewAttempt();
+ createNewAttempt(attemptId);
((RMAppAttemptImpl)this.currentAttempt).recover(state);
+ // If previous attempt is not in final state, it means we failed to store
+ // its final state. We set it to FAILED now because we could not make sure
+ // about its final state.
+ if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) {
+ preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED);
+ }
+ preAttempt = (RMAppAttemptImpl)currentAttempt;
+ }
+ if (currentAttempt != null) {
+ nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1;
}
}
private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
+ createNewAttempt(appAttemptId);
+ }
+ private void createNewAttempt(ApplicationAttemptId appAttemptId) {
BlacklistManager currentAMBlacklist;
if (currentAttempt != null) {
currentAMBlacklist = currentAttempt.getAMBlacklist();
@@ -1803,4 +1819,10 @@ public class RMAppImpl implements RMApp, Recoverable {
public float getAmBlacklistingDisableThreshold() {
return blacklistDisableThreshold;
}
+
+ @Private
+ @VisibleForTesting
+ public int getNextAttemptId() {
+ return nextAttemptId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6258b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 8aefe9f..3f45cb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -2113,4 +2113,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) {
this.amLaunchDiagnostics = amLaunchDiagnostics;
}
+
+ public RMAppAttemptState getRecoveredFinalState() {
+ return recoveredFinalState;
+ }
+
+ public void setRecoveredFinalState(RMAppAttemptState finalState) {
+ this.recoveredFinalState = finalState;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6258b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 625c3b8..3bab88a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreRMDTMa
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -2379,4 +2380,64 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
}
}
+ @Test(timeout = 60000)
+ public void testRMRestartOnMissingAttempts() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = createMockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create an app and finish the app.
+ RMApp app0 = rm1.submitApp(200);
+ ApplicationStateData app0State = memStore.getState().getApplicationState()
+ .get(app0.getApplicationId());
+
+ MockAM am0 = launchAndFailAM(app0, rm1, nm1);
+ MockAM am1 = launchAndFailAM(app0, rm1, nm1);
+ MockAM am2 = launchAndFailAM(app0, rm1, nm1);
+ MockAM am3 = launchAM(app0, rm1, nm1);
+
+ // am1 is missed from MemoryRMStateStore
+ memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId());
+ ApplicationAttemptStateData am2State = app0State.getAttempt(
+ am2.getApplicationAttemptId());
+ // am2's state is not consistent: MemoryRMStateStore just saved its initial
+ // state and failed to store its final state
+ am2State.setState(null);
+
+ // restart rm
+ MockRM rm2 = createMockRM(conf, memStore);
+ rm2.start();
+
+ Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
+ RMApp recoveredApp0 = rm2.getRMContext().getRMApps().values()
+ .iterator().next();
+ Map<ApplicationAttemptId, RMAppAttempt> recoveredAppAttempts
+ = recoveredApp0.getAppAttempts();
+ Assert.assertEquals(3, recoveredAppAttempts.size());
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ recoveredAppAttempts.get(
+ am0.getApplicationAttemptId()).getAppAttemptState());
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ recoveredAppAttempts.get(
+ am2.getApplicationAttemptId()).getAppAttemptState());
+ Assert.assertEquals(RMAppAttemptState.LAUNCHED,
+ recoveredAppAttempts.get(
+ am3.getApplicationAttemptId()).getAppAttemptState());
+ Assert.assertEquals(5, ((RMAppImpl)app0).getNextAttemptId());
+ }
+
+ private MockAM launchAndFailAM(RMApp app, MockRM rm, MockNM nm)
+ throws Exception {
+ MockAM am = launchAM(app, rm, nm);
+ nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FAILED);
+ return am;
+ }
}