You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by zj...@apache.org on 2014/02/07 01:18:46 UTC

svn commit: r1565497 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-serve...

Author: zjshen
Date: Fri Feb  7 00:18:46 2014
New Revision: 1565497

URL: http://svn.apache.org/r1565497
Log:
YARN-1689. Made RMAppAttempt get killed when RMApp is at ACCEPTED. Contributed by Vinod Kumar Vavilapalli.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1565497&r1=1565496&r2=1565497&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Feb  7 00:18:46 2014
@@ -633,6 +633,9 @@ Release 2.3.0 - UNRELEASED
     YARN-1661. Fixed DS ApplicationMaster to write the correct exit log. (Vinod
     Kumar Vavilapalli via zjshen)
 
+    YARN-1689. Made RMAppAttempt get killed when RMApp is at ACCEPTED. (Vinod
+    Kumar Vavilapalli via zjshen)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1565497&r1=1565496&r2=1565497&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Feb  7 00:18:46 2014
@@ -196,10 +196,8 @@ public class RMAppImpl implements RMApp,
         // waiting for the previous AM to exit.
         RMAppEventType.ATTEMPT_FAILED,
         new AttemptFailedTransition(RMAppState.ACCEPTED))
-    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
-        RMAppEventType.KILL,
-        new FinalSavingTransition(
-          new AppKilledTransition(), RMAppState.KILLED))
+    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
+        RMAppEventType.KILL, new KillAttemptTransition())
     // ACCECPTED state can once again receive APP_ACCEPTED event, because on
     // recovery the app returns ACCEPTED state and the app once again go
     // through the scheduler and triggers one more APP_ACCEPTED event at

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1565497&r1=1565496&r2=1565497&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Fri Feb  7 00:18:46 2014
@@ -482,6 +482,13 @@ public class MockRM extends ResourceMana
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     nm.nodeHeartbeat(true);
     MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+    return am;
+  }
+
+  public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    MockAM am = launchAM(app, rm, nm);
     am.registerAppAttempt();
     rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
     return am;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1565497&r1=1565496&r2=1565497&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Fri Feb  7 00:18:46 2014
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -33,6 +37,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -44,9 +49,17 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -54,7 +67,9 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Test;
+import org.mockito.ArgumentMatcher;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRM {
 
   private static final Log LOG = LogFactory.getLog(TestRM.class);
@@ -397,19 +412,19 @@ public class TestRM {
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
-    MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
     MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
 
     // a failed app
     RMApp app2 = rm1.submitApp(200);
-    MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
     nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
     am2.waitForState(RMAppAttemptState.FAILED);
     rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
 
     // a killed app
     RMApp app3 = rm1.submitApp(200);
-    MockAM am3 = MockRM.launchAM(app3, rm1, nm1);
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
     rm1.killApp(app3.getApplicationId());
     rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
     rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
@@ -449,7 +464,7 @@ public class TestRM {
 
     // a failed app
     RMApp app2 = rm1.submitApp(200);
-    MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
     nm1
       .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
     am2.waitForState(RMAppAttemptState.FAILED);
@@ -466,10 +481,88 @@ public class TestRM {
     Assert.assertEquals(-1, report1.getRpcPort());
   }
 
+  /**
+   * Validate killing an application when it is at accepted state.
+   * @throws Exception exception
+   */
+  @Test (timeout = 60000)
+  public void testApplicationKillAtAcceptedState() throws Exception {
+
+    YarnConfiguration conf = new YarnConfiguration();
+    final Dispatcher dispatcher = new AsyncDispatcher() {
+      @Override
+      public EventHandler getEventHandler() {
+
+        class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
+          @Override
+          public boolean matches(Object argument) {
+            if (argument instanceof RMAppAttemptEvent) {
+              if (((RMAppAttemptEvent) argument).getType().equals(
+                RMAppAttemptEventType.KILL)) {
+                return true;
+              }
+            }
+            return false;
+          }
+        }
+
+        EventHandler handler = spy(super.getEventHandler());
+        doNothing().when(handler).handle(argThat(new EventArgMatcher()));
+        return handler;
+      }
+    };
+
+    MockRM rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+
+    rm.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    // a failed app
+    RMApp application = rm.submitApp(200);
+    MockAM am = MockRM.launchAM(application, rm, nm1);
+    am.waitForState(RMAppAttemptState.LAUNCHED);
+    nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    rm.waitForState(application.getApplicationId(), RMAppState.ACCEPTED);
+
+    // Now kill the application before new attempt is launched, the app report
+    // returns the invalid AM host and port.
+    KillApplicationRequest request =
+        KillApplicationRequest.newInstance(application.getApplicationId());
+    rm.getClientRMService().forceKillApplication(request);
+
+    // Specific test for YARN-1689 follows
+    // Now let's say a race causes AM to register now. This should not crash RM.
+    am.registerAppAttempt(false);
+
+    // We explicitly intercepted the kill-event to RMAppAttempt, so app should
+    // still be in KILLING state.
+    rm.waitForState(application.getApplicationId(), RMAppState.KILLING);
+    // AM should now be in running
+    rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
+
+    // Simulate that appAttempt is killed.
+    rm.getRMContext().getDispatcher().getEventHandler().handle(
+        new RMAppEvent(application.getApplicationId(),
+          RMAppEventType.ATTEMPT_KILLED));
+    rm.waitForState(application.getApplicationId(), RMAppState.KILLED);
+  }
+
   public static void main(String[] args) throws Exception {
     TestRM t = new TestRM();
     t.testGetNewAppId();
     t.testAppWithNoContainers();
     t.testAppOnMultiNode();
+    t.testNMToken();
+    t.testActivatingApplicationAfterAddingNM();
+    t.testInvalidateAMHostPortWhenAMFailedOrKilled();
+    t.testInvalidatedAMHostPortOnAMRestart();
+    t.testApplicationKillAtAcceptedState();
   }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1565497&r1=1565496&r2=1565497&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Fri Feb  7 00:18:46 2014
@@ -69,7 +69,7 @@ public class TestAMRestart {
         new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService());
     nm2.registerNode();
 
-    MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
     int NUM_CONTAINERS = 3;
     // allocate NUM_CONTAINERS containers
     am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1565497&r1=1565496&r2=1565497&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Fri Feb  7 00:18:46 2014
@@ -639,6 +639,13 @@ public class TestRMAppTransitions {
         RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
+
+    assertAppState(RMAppState.KILLING, application);
+    RMAppEvent appAttemptKilled =
+        new RMAppEvent(application.getApplicationId(),
+          RMAppEventType.ATTEMPT_KILLED);
+    application.handle(appAttemptKilled);
+    assertAppState(RMAppState.FINAL_SAVING, application);
     sendAppUpdateSavedEvent(application);
     assertKilled(application);
     assertAppFinalStateSaved(application);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java?rev=1565497&r1=1565496&r2=1565497&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java Fri Feb  7 00:18:46 2014
@@ -1389,7 +1389,7 @@ public class TestRMWebServicesApps exten
     rm.start();
     MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 8192);
     RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
-    MockAM am = MockRM.launchAM(app1, rm, amNodeManager);
+    MockAM am = MockRM.launchAndRegisterAM(app1, rm, amNodeManager);
     int maxAppAttempts = rm.getConfig().getInt(
         YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -1405,7 +1405,7 @@ public class TestRMWebServicesApps exten
       }
       // wait for app to start a new attempt.
       rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
-      am = MockRM.launchAM(app1, rm, amNodeManager);
+      am = MockRM.launchAndRegisterAM(app1, rm, amNodeManager);
       numAttempt++;
     }
     assertEquals("incorrect number of attempts", maxAppAttempts,