You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/12/28 02:10:03 UTC

svn commit: r1553773 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/ hadoop-yarn/hadoop-yarn-serv...

Author: vinodkv
Date: Sat Dec 28 01:10:02 2013
New Revision: 1553773

URL: http://svn.apache.org/r1553773
Log:
YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port information once an AM crashes. Contributed by Jian He.
svn merge --ignore-ancestry -c 1553772 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1553773&r1=1553772&r2=1553773&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Dec 28 01:10:02 2013
@@ -173,6 +173,9 @@ Release 2.4.0 - UNRELEASED
 
     YARN-1523. Use StandbyException instead of RMNotYetReadyException (kasha)
 
+    YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port
+    information once an AM crashes. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1553773&r1=1553772&r2=1553773&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Sat Dec 28 01:10:02 2013
@@ -139,7 +139,7 @@ public class RMAppAttemptImpl implements
 
   private float progress = 0;
   private String host = "N/A";
-  private int rpcPort;
+  private int rpcPort = -1;
   private String originalTrackingUrl = "N/A";
   private String proxiedTrackingUrl = "N/A";
   private long startTime = 0;
@@ -526,6 +526,11 @@ public class RMAppAttemptImpl implements
     proxiedTrackingUrl = originalTrackingUrl;
   }
 
+  private void invalidateAMHostAndPort() {
+    this.host = "N/A";
+    this.rpcPort = -1;
+  }
+
   // This is only used for RMStateStore. Normal operation must invoke the secret
   // manager to get the key and not use the local key directly.
   @Override
@@ -1033,6 +1038,7 @@ public class RMAppAttemptImpl implements
         {
           // don't leave the tracking URL pointing to a non-existent AM
           appAttempt.setTrackingUrlToRMAppPage();
+          appAttempt.invalidateAMHostAndPort();
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
                   RMAppEventType.ATTEMPT_KILLED,
@@ -1043,6 +1049,7 @@ public class RMAppAttemptImpl implements
         {
           // don't leave the tracking URL pointing to a non-existent AM
           appAttempt.setTrackingUrlToRMAppPage();
+          appAttempt.invalidateAMHostAndPort();
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
                   RMAppEventType.ATTEMPT_FAILED,
@@ -1059,7 +1066,6 @@ public class RMAppAttemptImpl implements
       appAttempt.eventHandler.handle(appEvent);
       appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
         appAttemptId, finalAttemptState));
-
       appAttempt.removeCredentials(appAttempt);
     }
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1553773&r1=1553772&r2=1553773&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Sat Dec 28 01:10:02 2013
@@ -19,26 +19,33 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 
-import javax.security.auth.login.Configuration;
-
 import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -368,6 +375,111 @@ public class TestRM {
     rm1.stop();
   }
 
+  // This is to test AM Host and rpc port are invalidated after the am attempt
+  // is killed or failed, so that client doesn't get the wrong information.
+  @Test (timeout = 80000)
+  public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+
+    // a succeeded app
+    RMApp app1 = rm1.submitApp(200);
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    MockAM am1 = launchAM(app1, rm1, nm1);
+    finishApplicationMaster(app1, rm1, nm1, am1);
+
+    // a failed app
+    RMApp app2 = rm1.submitApp(200);
+    MockAM am2 = launchAM(app2, rm1, nm1);
+    nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am2.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
+
+    // a killed app
+    RMApp app3 = rm1.submitApp(200);
+    MockAM am3 = launchAM(app3, rm1, nm1);
+    rm1.killApp(app3.getApplicationId());
+    rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
+    rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+
+    GetApplicationsRequest request1 =
+        GetApplicationsRequest.newInstance(EnumSet.of(
+          YarnApplicationState.FINISHED, YarnApplicationState.KILLED,
+          YarnApplicationState.FAILED));
+    GetApplicationsResponse response1 =
+        rm1.getClientRMService().getApplications(request1);
+    List<ApplicationReport> appList1 = response1.getApplicationList();
+
+    Assert.assertEquals(3, appList1.size());
+    for (ApplicationReport report : appList1) {
+      // killed/failed apps host and rpc port are invalidated.
+      if (report.getApplicationId().equals(app2.getApplicationId())
+          || report.getApplicationId().equals(app3.getApplicationId())) {
+        Assert.assertEquals("N/A", report.getHost());
+        Assert.assertEquals(-1, report.getRpcPort());
+      }
+      // succeeded app's host and rpc port is not invalidated
+      if (report.getApplicationId().equals(app1.getApplicationId())) {
+        Assert.assertFalse(report.getHost().equals("N/A"));
+        Assert.assertTrue(report.getRpcPort() != -1);
+      }
+    }
+  }
+
+  @Test (timeout = 60000)
+  public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // a failed app
+    RMApp app2 = rm1.submitApp(200);
+    MockAM am2 = launchAM(app2, rm1, nm1);
+    nm1
+      .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am2.waitForState(RMAppAttemptState.FAILED);
+    rm1.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+
+    // before new attempt is launched, the app report returns the invalid AM
+    // host and port.
+    GetApplicationReportRequest request1 =
+        GetApplicationReportRequest.newInstance(app2.getApplicationId());
+    ApplicationReport report1 =
+        rm1.getClientRMService().getApplicationReport(request1)
+          .getApplicationReport();
+    Assert.assertEquals("N/A", report1.getHost());
+    Assert.assertEquals(-1, report1.getRpcPort());
+  }
+
+  private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    return am;
+  }
+
+  private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+      MockAM am) throws Exception {
+    FinishApplicationMasterRequest req =
+        FinishApplicationMasterRequest.newInstance(
+          FinalApplicationStatus.SUCCEEDED, "", "");
+    am.unregisterAppAttempt(req);
+    am.waitForState(RMAppAttemptState.FINISHING);
+    nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+    rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
+  }
+
   public static void main(String[] args) throws Exception {
     TestRM t = new TestRM();
     t.testGetNewAppId();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1553773&r1=1553772&r2=1553773&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Sat Dec 28 01:10:02 2013
@@ -806,6 +806,7 @@ public class TestRMAppAttemptTransitions
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
+    verifyAMHostAndPortInvalidated();
   }
 
   @Test
@@ -841,6 +842,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+    verifyAMHostAndPortInvalidated();
   }
 
   @Test(timeout=10000)
@@ -878,6 +880,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
     assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+    verifyAMHostAndPortInvalidated();
   }
 
   @Test 
@@ -1125,4 +1128,9 @@ public class TestRMAppAttemptTransitions
     verify(store, times(1)).updateApplicationAttemptState(
       any(ApplicationAttemptState.class));
   }
+
+  private void verifyAMHostAndPortInvalidated() {
+    assertEquals("N/A", applicationAttempt.getHost());
+    assertEquals(-1, applicationAttempt.getRpcPort());
+  }
 }