You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/09 22:16:56 UTC
svn commit: r1466209 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn...
Author: vinodkv
Date: Tue Apr 9 20:16:56 2013
New Revision: 1466209
URL: http://svn.apache.org/r1466209
Log:
YARN-534. Change RM restart recovery to also account for AM max-attempts configuration after the restart. Contributed by Jian He.
svn merge --ignore-ancestry -c 1466208 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1466209&r1=1466208&r2=1466209&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Apr 9 20:16:56 2013
@@ -153,6 +153,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-112. Fixed a race condition during localization that fails containers.
(Omkar Vinit Joshi via vinodkv)
+ YARN-534. Change RM restart recovery to also account for AM max-attempts
+ configuration after the restart. (Jian He via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1466209&r1=1466208&r2=1466209&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Tue Apr 9 20:16:56 2013
@@ -57,6 +57,7 @@ public class RMAppManager implements Eve
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
+ private int globalMaxAppAttempts;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext;
@@ -76,6 +77,8 @@ public class RMAppManager implements Eve
setCompletedAppsMax(conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
+ globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
}
/**
@@ -308,6 +311,7 @@ public class RMAppManager implements Eve
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) {
+ boolean shouldRecover = true;
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
@@ -318,16 +322,39 @@ public class RMAppManager implements Eve
// This will need to be changed in work preserving recovery in which
// RM will re-connect with the running AM's instead of restarting them
LOG.info("Not recovering unmanaged application " + appState.getAppId());
- store.removeApplication(appState);
+ shouldRecover = false;
+ }
+ int individualMaxAppAttempts = appState.getApplicationSubmissionContext()
+ .getMaxAppAttempts();
+ int maxAppAttempts;
+ if (individualMaxAppAttempts <= 0 ||
+ individualMaxAppAttempts > globalMaxAppAttempts) {
+ maxAppAttempts = globalMaxAppAttempts;
+ LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+ + " for application: " + appState.getAppId()
+ + " is invalid, because it is out of the range [1, "
+ + globalMaxAppAttempts + "]. Use the global max attempts instead.");
} else {
+ maxAppAttempts = individualMaxAppAttempts;
+ }
+ if(appState.getAttemptCount() >= maxAppAttempts) {
+ LOG.info("Not recovering application " + appState.getAppId() +
+ " due to recovering attempt is beyond maxAppAttempt limit");
+ shouldRecover = false;
+ }
+
+ if(shouldRecover) {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),
- appState.getSubmitTime());
+ appState.getSubmitTime());
// re-populate attempt information in application
RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
- appState.getAppId());
+ appState.getAppId());
appImpl.recover(state);
}
+ else {
+ store.removeApplication(appState);
+ }
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1466209&r1=1466208&r2=1466209&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Apr 9 20:16:56 2013
@@ -128,21 +128,28 @@ public class MockRM extends ResourceMana
// client
public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
- return submitApp(masterMemory, name, user, null, false, null);
+ return submitApp(masterMemory, name, user, null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception {
- return submitApp(masterMemory, name, user, acls, false, null);
+ return submitApp(masterMemory, name, user, acls, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, String queue) throws Exception {
- return submitApp(masterMemory, name, user, acls, false, queue);
- }
+ return submitApp(masterMemory, name, user, acls, false, queue,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
+ }
public RMApp submitApp(int masterMemory, String name, String user,
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue) throws Exception {
+ Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+ int maxAppAttempts) throws Exception {
ClientRMProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
@@ -155,6 +162,7 @@ public class MockRM extends ResourceMana
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setUser(user);
+ sub.setMaxAppAttempts(maxAppAttempts);
if(unmanaged) {
sub.setUnmanagedAM(true);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1466209&r1=1466208&r2=1466209&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Apr 9 20:16:56 2013
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -62,6 +64,7 @@ public class TestRMRestart {
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
@@ -152,7 +155,9 @@ public class TestRMRestart {
.getApplicationId());
// create unmanaged app
- RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null);
+ RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true,
+ null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
ApplicationAttemptId unmanagedAttemptId =
appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
// assert appUnmanaged info is saved
@@ -306,4 +311,74 @@ public class TestRMRestart {
Assert.assertEquals(0, rmAppState.size());
}
+ @Test
+ public void testRMRestartOnMaxAppAttempts() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ ExitUtil.disableSystemExit();
+
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE,
+ "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+
+ Map<ApplicationId, ApplicationState> rmAppState =
+ rmState.getApplicationState();
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // submit an app with maxAppAttempts equals to 1
+ RMApp app1 = rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", 1);
+ // submit an app with maxAppAttempts equals to -1
+ RMApp app2 = rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", -1);
+
+ // assert app1 info is saved
+ ApplicationState appState = rmAppState.get(app1.getApplicationId());
+ Assert.assertNotNull(appState);
+ Assert.assertEquals(0, appState.getAttemptCount());
+ Assert.assertEquals(appState.getApplicationSubmissionContext()
+ .getApplicationId(), app1.getApplicationSubmissionContext()
+ .getApplicationId());
+
+ // Allocate the AM
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt = app1.getCurrentAppAttempt();
+ ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
+ rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+ Assert.assertEquals(1, appState.getAttemptCount());
+ ApplicationAttemptState attemptState =
+ appState.getAttempt(attemptId1);
+ Assert.assertNotNull(attemptState);
+ Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
+ attemptState.getMasterContainer().getId());
+ rm1.stop();
+
+ // start new RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // verify that maxAppAttempts is set to global value
+ Assert.assertEquals(2,
+ rm2.getRMContext().getRMApps().get(app2.getApplicationId())
+ .getMaxAppAttempts());
+
+ // verify that app2 exists app1 is removed
+ Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
+ Assert.assertNotNull(rm2.getRMContext().getRMApps()
+ .get(app2.getApplicationId()));
+ Assert.assertNull(rm2.getRMContext().getRMApps()
+ .get(app1.getApplicationId()));
+
+ // stop the RM
+ rm2.stop();
+ }
}