You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/09 22:16:56 UTC

svn commit: r1466209 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn...

Author: vinodkv
Date: Tue Apr  9 20:16:56 2013
New Revision: 1466209

URL: http://svn.apache.org/r1466209
Log:
YARN-534. Change RM restart recovery to also account for AM max-attempts configuration after the restart. Contributed by Jian He.
svn merge --ignore-ancestry -c 1466208 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1466209&r1=1466208&r2=1466209&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Apr  9 20:16:56 2013
@@ -153,6 +153,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-112. Fixed a race condition during localization that fails containers.
     (Omkar Vinit Joshi via vinodkv)
 
+    YARN-534. Change RM restart recovery to also account for AM max-attempts 
+    configuration after the restart. (Jian He via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1466209&r1=1466208&r2=1466209&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Tue Apr  9 20:16:56 2013
@@ -57,6 +57,7 @@ public class RMAppManager implements Eve
   private static final Log LOG = LogFactory.getLog(RMAppManager.class);
 
   private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
+  private int globalMaxAppAttempts;
   private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
 
   private final RMContext rmContext;
@@ -76,6 +77,8 @@ public class RMAppManager implements Eve
     setCompletedAppsMax(conf.getInt(
         YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
         YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
+    globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
   }
 
   /**
@@ -308,6 +311,7 @@ public class RMAppManager implements Eve
     Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
     LOG.info("Recovering " + appStates.size() + " applications");
     for(ApplicationState appState : appStates.values()) {
+      boolean shouldRecover = true;
       // re-submit the application
       // this is going to send an app start event but since the async dispatcher 
       // has not started that event will be queued until we have completed re
@@ -318,16 +322,39 @@ public class RMAppManager implements Eve
         // This will need to be changed in work preserving recovery in which 
         // RM will re-connect with the running AM's instead of restarting them
         LOG.info("Not recovering unmanaged application " + appState.getAppId());
-        store.removeApplication(appState);
+        shouldRecover = false;
+      }
+      int individualMaxAppAttempts = appState.getApplicationSubmissionContext()
+          .getMaxAppAttempts();
+      int maxAppAttempts;
+      if (individualMaxAppAttempts <= 0 ||
+          individualMaxAppAttempts > globalMaxAppAttempts) {
+        maxAppAttempts = globalMaxAppAttempts;
+        LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+            + " for application: " + appState.getAppId()
+            + " is invalid, because it is out of the range [1, "
+            + globalMaxAppAttempts + "]. Use the global max attempts instead.");
       } else {
+        maxAppAttempts = individualMaxAppAttempts;
+      }
+      if(appState.getAttemptCount() >= maxAppAttempts) {
+        LOG.info("Not recovering application " + appState.getAppId() +
+            " due to recovering attempt is beyond maxAppAttempt limit");
+        shouldRecover = false;
+      }
+
+      if(shouldRecover) {
         LOG.info("Recovering application " + appState.getAppId());
         submitApplication(appState.getApplicationSubmissionContext(), 
-                          appState.getSubmitTime());
+                        appState.getSubmitTime());
         // re-populate attempt information in application
         RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
-                                                          appState.getAppId());
+                                                        appState.getAppId());
         appImpl.recover(state);
       }
+      else {
+        store.removeApplication(appState);
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1466209&r1=1466208&r2=1466209&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Apr  9 20:16:56 2013
@@ -128,21 +128,28 @@ public class MockRM extends ResourceMana
 
   // client
   public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
-    return submitApp(masterMemory, name, user, null, false, null);
+    return submitApp(masterMemory, name, user, null, false, null,
+      super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
   }
   
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls) throws Exception {
-    return submitApp(masterMemory, name, user, acls, false, null);
+    return submitApp(masterMemory, name, user, acls, false, null,
+      super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
   }  
 
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, String queue) throws Exception {
-    return submitApp(masterMemory, name, user, acls, false, queue);
-  }  
+    return submitApp(masterMemory, name, user, acls, false, queue,
+      super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
+  }
 
   public RMApp submitApp(int masterMemory, String name, String user,
-      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue) throws Exception {
+      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+      int maxAppAttempts) throws Exception {
     ClientRMProtocol client = getClientRMService();
     GetNewApplicationResponse resp = client.getNewApplication(Records
         .newRecord(GetNewApplicationRequest.class));
@@ -155,6 +162,7 @@ public class MockRM extends ResourceMana
     sub.setApplicationId(appId);
     sub.setApplicationName(name);
     sub.setUser(user);
+    sub.setMaxAppAttempts(maxAppAttempts);
     if(unmanaged) {
       sub.setUnmanagedAM(true);
     }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1466209&r1=1466208&r2=1466209&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Apr  9 20:16:56 2013
@@ -19,11 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -62,6 +64,7 @@ public class TestRMRestart {
     "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
     conf.set(YarnConfiguration.RM_SCHEDULER, 
     "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
 
     MemoryRMStateStore memStore = new MemoryRMStateStore();
     memStore.init(conf);
@@ -152,7 +155,9 @@ public class TestRMRestart {
         .getApplicationId());
     
     // create unmanaged app
-    RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null);
+    RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true,
+        null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+          YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
     ApplicationAttemptId unmanagedAttemptId = 
                         appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
     // assert appUnmanaged info is saved
@@ -306,4 +311,74 @@ public class TestRMRestart {
     Assert.assertEquals(0, rmAppState.size());
  }
   
+  @Test
+  public void testRMRestartOnMaxAppAttempts() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    ExitUtil.disableSystemExit();
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, 
+    "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();  
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // submit an app with maxAppAttempts equals to 1
+    RMApp app1 = rm1.submitApp(200, "name", "user",
+        new HashMap<ApplicationAccessType, String>(), false, "default", 1);
+    // submit an app with maxAppAttempts equals to -1
+    RMApp app2 = rm1.submitApp(200, "name", "user",
+        new HashMap<ApplicationAccessType, String>(), false, "default", -1);
+
+    // assert app1 info is saved
+    ApplicationState appState = rmAppState.get(app1.getApplicationId());
+    Assert.assertNotNull(appState);
+    Assert.assertEquals(0, appState.getAttemptCount());
+    Assert.assertEquals(appState.getApplicationSubmissionContext()
+        .getApplicationId(), app1.getApplicationSubmissionContext()
+        .getApplicationId());
+
+    // Allocate the AM
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt = app1.getCurrentAppAttempt();
+    ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
+    rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+    Assert.assertEquals(1, appState.getAttemptCount());
+    ApplicationAttemptState attemptState = 
+                                appState.getAttempt(attemptId1);
+    Assert.assertNotNull(attemptState);
+    Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), 
+                        attemptState.getMasterContainer().getId());
+    rm1.stop();
+
+    // start new RM   
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.start();
+
+    // verify that maxAppAttempts is set to global value
+    Assert.assertEquals(2, 
+        rm2.getRMContext().getRMApps().get(app2.getApplicationId())
+        .getMaxAppAttempts());
+
+    // verify that app2 exists  app1 is removed
+    Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
+    Assert.assertNotNull(rm2.getRMContext().getRMApps()
+        .get(app2.getApplicationId()));
+    Assert.assertNull(rm2.getRMContext().getRMApps()
+        .get(app1.getApplicationId()));
+
+    // stop the RM
+    rm2.stop();
+  }
 }