You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/12/28 02:09:07 UTC
svn commit: r1553772 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/
hadoop-yarn/hadoop-yarn-server/hadoop-ya...
Author: vinodkv
Date: Sat Dec 28 01:09:07 2013
New Revision: 1553772
URL: http://svn.apache.org/r1553772
Log:
YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port information once an AM crashes. Contributed by Jian He.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1553772&r1=1553771&r2=1553772&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Dec 28 01:09:07 2013
@@ -191,6 +191,9 @@ Release 2.4.0 - UNRELEASED
YARN-1523. Use StandbyException instead of RMNotYetReadyException (kasha)
+ YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port
+ information once an AM crashes. (Jian He via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1553772&r1=1553771&r2=1553772&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Sat Dec 28 01:09:07 2013
@@ -139,7 +139,7 @@ public class RMAppAttemptImpl implements
private float progress = 0;
private String host = "N/A";
- private int rpcPort;
+ private int rpcPort = -1;
private String originalTrackingUrl = "N/A";
private String proxiedTrackingUrl = "N/A";
private long startTime = 0;
@@ -526,6 +526,11 @@ public class RMAppAttemptImpl implements
proxiedTrackingUrl = originalTrackingUrl;
}
+ private void invalidateAMHostAndPort() {
+ this.host = "N/A";
+ this.rpcPort = -1;
+ }
+
// This is only used for RMStateStore. Normal operation must invoke the secret
// manager to get the key and not use the local key directly.
@Override
@@ -1033,6 +1038,7 @@ public class RMAppAttemptImpl implements
{
// don't leave the tracking URL pointing to a non-existent AM
appAttempt.setTrackingUrlToRMAppPage();
+ appAttempt.invalidateAMHostAndPort();
appEvent =
new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_KILLED,
@@ -1043,6 +1049,7 @@ public class RMAppAttemptImpl implements
{
// don't leave the tracking URL pointing to a non-existent AM
appAttempt.setTrackingUrlToRMAppPage();
+ appAttempt.invalidateAMHostAndPort();
appEvent =
new RMAppFailedAttemptEvent(applicationId,
RMAppEventType.ATTEMPT_FAILED,
@@ -1059,7 +1066,6 @@ public class RMAppAttemptImpl implements
appAttempt.eventHandler.handle(appEvent);
appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
appAttemptId, finalAttemptState));
-
appAttempt.removeCredentials(appAttempt);
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1553772&r1=1553771&r2=1553772&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Sat Dec 28 01:09:07 2013
@@ -19,26 +19,33 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
-import javax.security.auth.login.Configuration;
-
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -368,6 +375,111 @@ public class TestRM {
rm1.stop();
}
+ // This is to test AM Host and rpc port are invalidated after the am attempt
+ // is killed or failed, so that client doesn't get the wrong information.
+ @Test (timeout = 80000)
+ public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+
+ // a succeeded app
+ RMApp app1 = rm1.submitApp(200);
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ MockAM am1 = launchAM(app1, rm1, nm1);
+ finishApplicationMaster(app1, rm1, nm1, am1);
+
+ // a failed app
+ RMApp app2 = rm1.submitApp(200);
+ MockAM am2 = launchAM(app2, rm1, nm1);
+ nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FAILED);
+ rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
+
+ // a killed app
+ RMApp app3 = rm1.submitApp(200);
+ MockAM am3 = launchAM(app3, rm1, nm1);
+ rm1.killApp(app3.getApplicationId());
+ rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
+ rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
+
+ GetApplicationsRequest request1 =
+ GetApplicationsRequest.newInstance(EnumSet.of(
+ YarnApplicationState.FINISHED, YarnApplicationState.KILLED,
+ YarnApplicationState.FAILED));
+ GetApplicationsResponse response1 =
+ rm1.getClientRMService().getApplications(request1);
+ List<ApplicationReport> appList1 = response1.getApplicationList();
+
+ Assert.assertEquals(3, appList1.size());
+ for (ApplicationReport report : appList1) {
+ // killed/failed apps host and rpc port are invalidated.
+ if (report.getApplicationId().equals(app2.getApplicationId())
+ || report.getApplicationId().equals(app3.getApplicationId())) {
+ Assert.assertEquals("N/A", report.getHost());
+ Assert.assertEquals(-1, report.getRpcPort());
+ }
+ // succeeded app's host and rpc port is not invalidated
+ if (report.getApplicationId().equals(app1.getApplicationId())) {
+ Assert.assertFalse(report.getHost().equals("N/A"));
+ Assert.assertTrue(report.getRpcPort() != -1);
+ }
+ }
+ }
+
+ @Test (timeout = 60000)
+ public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // a failed app
+ RMApp app2 = rm1.submitApp(200);
+ MockAM am2 = launchAM(app2, rm1, nm1);
+ nm1
+ .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am2.waitForState(RMAppAttemptState.FAILED);
+ rm1.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+
+ // before new attempt is launched, the app report returns the invalid AM
+ // host and port.
+ GetApplicationReportRequest request1 =
+ GetApplicationReportRequest.newInstance(app2.getApplicationId());
+ ApplicationReport report1 =
+ rm1.getClientRMService().getApplicationReport(request1)
+ .getApplicationReport();
+ Assert.assertEquals("N/A", report1.getHost());
+ Assert.assertEquals(-1, report1.getRpcPort());
+ }
+
+ private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+ throws Exception {
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ nm.nodeHeartbeat(true);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+ return am;
+ }
+
+ private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+ MockAM am) throws Exception {
+ FinishApplicationMasterRequest req =
+ FinishApplicationMasterRequest.newInstance(
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ am.unregisterAppAttempt(req);
+ am.waitForState(RMAppAttemptState.FINISHING);
+ nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ am.waitForState(RMAppAttemptState.FINISHED);
+ rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
+ }
+
public static void main(String[] args) throws Exception {
TestRM t = new TestRM();
t.testGetNewAppId();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1553772&r1=1553771&r2=1553772&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Sat Dec 28 01:09:07 2013
@@ -806,6 +806,7 @@ public class TestRMAppAttemptTransitions
applicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
+ verifyAMHostAndPortInvalidated();
}
@Test
@@ -841,6 +842,7 @@ public class TestRMAppAttemptTransitions
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+ verifyAMHostAndPortInvalidated();
}
@Test(timeout=10000)
@@ -878,6 +880,7 @@ public class TestRMAppAttemptTransitions
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
+ verifyAMHostAndPortInvalidated();
}
@Test
@@ -1125,4 +1128,9 @@ public class TestRMAppAttemptTransitions
verify(store, times(1)).updateApplicationAttemptState(
any(ApplicationAttemptState.class));
}
+
+ private void verifyAMHostAndPortInvalidated() {
+ assertEquals("N/A", applicationAttempt.getHost());
+ assertEquals(-1, applicationAttempt.getRpcPort());
+ }
}