You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/11/25 21:51:15 UTC
[1/2] hadoop git commit: YARN-2404. Removed ApplicationAttemptState
and ApplicationState class in RMStateStore. Contributed by Tsuyoshi OZAWA
(cherry picked from commit 5805a81efbc024024d8172489dfdc6cf77879416)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 4b62d6d2f -> 286305653
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index a0f8627..a42170b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -84,8 +84,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
@@ -162,7 +160,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
@@ -194,7 +192,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// create app that gets launched and does allocate before RM restart
RMApp app1 = rm1.submitApp(200);
// assert app1 info is saved
- ApplicationState appState = rmAppState.get(app1.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
@@ -209,7 +207,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
Assert.assertEquals(1, appState.getAttemptCount());
- ApplicationAttemptState attemptState =
+ ApplicationAttemptStateData attemptState =
appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
@@ -429,7 +427,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@@ -450,7 +448,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am0.waitForState(RMAppAttemptState.FAILED);
- ApplicationState appState = rmAppState.get(app0.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
// assert the AM failed state is saved.
Assert.assertEquals(RMAppAttemptState.FAILED,
appState.getAttempt(am0.getApplicationAttemptId()).getState());
@@ -486,7 +484,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@@ -650,7 +648,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
};
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@@ -689,7 +687,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@@ -709,7 +707,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
// assert the app/attempt failed state is saved.
- ApplicationState appState = rmAppState.get(app0.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
Assert.assertEquals(RMAppState.FAILED, appState.getState());
Assert.assertEquals(RMAppAttemptState.FAILED,
appState.getAttempt(am0.getApplicationAttemptId()).getState());
@@ -737,7 +735,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@@ -757,7 +755,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm1.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.KILLED);
// killed state is saved.
- ApplicationState appState = rmAppState.get(app0.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
Assert.assertEquals(RMAppState.KILLED, appState.getState());
Assert.assertEquals(RMAppAttemptState.KILLED,
appState.getAttempt(am0.getApplicationAttemptId()).getState());
@@ -823,7 +821,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// start RM
@@ -844,8 +842,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
finishApplicationMaster(app0, rm1, nm1, am0, req);
// check the state store about the unregistered info.
- ApplicationState appState = rmAppState.get(app0.getApplicationId());
- ApplicationAttemptState attemptState0 =
+ ApplicationStateData appState = rmAppState.get(app0.getApplicationId());
+ ApplicationAttemptStateData attemptState0 =
appState.getAttempt(am0.getApplicationAttemptId());
Assert.assertEquals("diagnostics", attemptState0.getDiagnostics());
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
@@ -995,7 +993,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MockAM am, FinishApplicationMasterRequest req) throws Exception {
RMState rmState =
((MemoryRMStateStore) rm.getRMContext().getStateStore()).getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
am.unregisterAppAttempt(req,true);
am.waitForState(RMAppAttemptState.FINISHING);
@@ -1003,7 +1001,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
am.waitForState(RMAppAttemptState.FINISHED);
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
// check that app/attempt is saved with the final state
- ApplicationState appState = rmAppState.get(rmApp.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(rmApp.getApplicationId());
Assert
.assertEquals(RMAppState.FINISHED, appState.getState());
Assert.assertEquals(RMAppAttemptState.FINISHED,
@@ -1019,7 +1017,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
MockRM rm1 = createMockRM(conf, memStore);
rm1.start();
@@ -1037,7 +1035,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
null);
// assert app1 info is saved
- ApplicationState appState = rmAppState.get(app1.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
@@ -1050,7 +1048,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
Assert.assertEquals(1, appState.getAttemptCount());
- ApplicationAttemptState attemptState =
+ ApplicationAttemptStateData attemptState =
appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
@@ -1092,7 +1090,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
@@ -1131,7 +1129,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
// assert app info is saved
- ApplicationState appState = rmAppState.get(app.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert delegation tokens exist in rm1 DelegationTokenRenewr
@@ -1187,7 +1185,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start();
@@ -1201,7 +1199,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
new HashMap<ApplicationAccessType, String>(), "default");
// assert app info is saved
- ApplicationState appState = rmAppState.get(app1.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
// Allocate the AM
@@ -1211,7 +1209,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
// assert attempt info is saved
- ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
+ ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
@@ -1222,7 +1220,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
attempt1.getClientTokenMasterKey().getEncoded();
// assert application credentials are saved
- Credentials savedCredentials = attemptState.getAppAttemptCredentials();
+ Credentials savedCredentials = attemptState.getAppAttemptTokens();
Assert.assertArrayEquals("client token master key not saved",
clientTokenMasterKey, savedCredentials.getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
@@ -1268,7 +1266,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
RMState rmState = memStore.getState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
Map<RMDelegationTokenIdentifier, Long> rmDTState =
rmState.getRMDTSecretManagerState().getTokenState();
@@ -1305,7 +1303,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
// assert app info is saved
- ApplicationState appState = rmAppState.get(app.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert all master keys are saved
@@ -1479,7 +1477,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// queue, and will be processed once rm.stop() is called.
// Nothing exist in state store before stop is called.
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
memStore.getState().getApplicationState();
Assert.assertTrue(rmAppState.size() == 0);
@@ -1489,7 +1487,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// Assert app info is still saved even if stop is called with pending saving
// request on dispatcher.
for (RMApp app : appList) {
- ApplicationState appState = rmAppState.get(app.getApplicationId());
+ ApplicationStateData appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
@@ -1523,7 +1521,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
rmState.getApplicationState();
// app0 exits in both state store and rmContext
@@ -1658,10 +1656,15 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
memStore.init(conf);
for (int i = 10; i > 0; i--) {
- ApplicationState appState = mock(ApplicationState.class);
- when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i));
- memStore.getState().getApplicationState()
- .put(appState.getAppId(), appState);
+ ApplicationStateData appState = mock(ApplicationStateData.class);
+ ApplicationSubmissionContext context =
+ mock(ApplicationSubmissionContext.class);
+ when(appState.getApplicationSubmissionContext()).thenReturn(context);
+ when(context.getApplicationId()).thenReturn(
+ ApplicationId.newInstance(1234, i));
+ memStore.getState().getApplicationState().put(
+ appState.getApplicationSubmissionContext().getApplicationId(),
+ appState);
}
MockRM rm1 = new MockRM(conf, memStore) {
@@ -1681,12 +1684,14 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
}
@Override
- protected void recoverApplication(ApplicationState appState,
+ protected void recoverApplication(ApplicationStateData appState,
RMState rmState) throws Exception {
// check application is recovered in order.
Assert.assertTrue(rmState.getApplicationState().size() > 0);
- Assert.assertTrue(appState.getAppId().compareTo(prevId) > 0);
- prevId = appState.getAppId();
+ Assert.assertTrue(appState.getApplicationSubmissionContext()
+ .getApplicationId().compareTo(prevId) > 0);
+ prevId =
+ appState.getApplicationSubmissionContext().getApplicationId();
}
}
};
@@ -2030,4 +2035,5 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// Do nothing.
}
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index a93123e..49b1841 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -386,7 +386,7 @@ public class TestAMRestart {
am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
- ApplicationState appState =
+ ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
// AM should be restarted even though max-am-attempt is 1.
MockAM am2 =
@@ -497,7 +497,7 @@ public class TestAMRestart {
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// state store has 1 attempt stored.
- ApplicationState appState =
+ ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
Assert.assertEquals(1, appState.getAttemptCount());
// attempt stored has the preempted container exit status.
@@ -555,7 +555,7 @@ public class TestAMRestart {
// Restart rm.
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
- ApplicationState appState =
+ ApplicationStateData appState =
memStore.getState().getApplicationState().get(app1.getApplicationId());
// re-register the NM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 00b60d3..8d6a7b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -58,8 +58,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
@@ -243,6 +243,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
+ when(mockRemovedApp.getUser()).thenReturn("user1");
RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
when(mockRemovedAttempt.getRMAppAttemptMetrics())
@@ -269,10 +270,10 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
- Map<ApplicationId, ApplicationState> rmAppState =
+ Map<ApplicationId, ApplicationStateData> rmAppState =
state.getApplicationState();
- ApplicationState appState = rmAppState.get(appId1);
+ ApplicationStateData appState = rmAppState.get(appId1);
// app is loaded
assertNotNull(appState);
// app is loaded correctly
@@ -281,7 +282,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// submission context is loaded correctly
assertEquals(appId1,
appState.getApplicationSubmissionContext().getApplicationId());
- ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
+ ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId1);
// attempt1 is loaded correctly
assertNotNull(attemptState);
assertEquals(attemptId1, attemptState.getAttemptId());
@@ -289,9 +290,10 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// attempt1 container is loaded correctly
assertEquals(containerId1, attemptState.getMasterContainer().getId());
// attempt1 client token master key is loaded correctly
- assertArrayEquals(clientTokenKey1.getEncoded(),
- attemptState.getAppAttemptCredentials()
- .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+ assertArrayEquals(
+ clientTokenKey1.getEncoded(),
+ attemptState.getAppAttemptTokens()
+ .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
attemptState = appState.getAttempt(attemptId2);
// attempt2 is loaded correctly
@@ -300,27 +302,30 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// attempt2 container is loaded correctly
assertEquals(containerId2, attemptState.getMasterContainer().getId());
// attempt2 client token master key is loaded correctly
- assertArrayEquals(clientTokenKey2.getEncoded(),
- attemptState.getAppAttemptCredentials()
- .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+ assertArrayEquals(
+ clientTokenKey2.getEncoded(),
+ attemptState.getAppAttemptTokens()
+ .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
//******* update application/attempt state *******//
- ApplicationState appState2 =
- new ApplicationState(appState.submitTime, appState.startTime,
- appState.context, appState.user, RMAppState.FINISHED,
- "appDiagnostics", 1234);
+ ApplicationStateData appState2 =
+ ApplicationStateData.newInstance(appState.getSubmitTime(),
+ appState.getStartTime(), appState.getUser(),
+ appState.getApplicationSubmissionContext(), RMAppState.FINISHED,
+ "appDiagnostics", 1234);
appState2.attempts.putAll(appState.attempts);
store.updateApplicationState(appState2);
- ApplicationAttemptState oldAttemptState = attemptState;
- ApplicationAttemptState newAttemptState =
- new ApplicationAttemptState(oldAttemptState.getAttemptId(),
- oldAttemptState.getMasterContainer(),
- oldAttemptState.getAppAttemptCredentials(),
- oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
- "myTrackingUrl", "attemptDiagnostics",
- FinalApplicationStatus.SUCCEEDED, 100,
- oldAttemptState.getFinishTime(), 0, 0);
+ ApplicationAttemptStateData oldAttemptState = attemptState;
+ ApplicationAttemptStateData newAttemptState =
+ ApplicationAttemptStateData.newInstance(
+ oldAttemptState.getAttemptId(),
+ oldAttemptState.getMasterContainer(),
+ oldAttemptState.getAppAttemptTokens(),
+ oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
+ "myTrackingUrl", "attemptDiagnostics",
+ FinalApplicationStatus.SUCCEEDED, 100,
+ oldAttemptState.getFinishTime(), 0, 0);
store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not
@@ -329,22 +334,22 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
ApplicationSubmissionContext dummyContext =
new ApplicationSubmissionContextPBImpl();
dummyContext.setApplicationId(dummyAppId);
- ApplicationState dummyApp =
- new ApplicationState(appState.submitTime, appState.startTime,
- dummyContext, appState.user, RMAppState.FINISHED, "appDiagnostics",
- 1234);
+ ApplicationStateData dummyApp =
+ ApplicationStateData.newInstance(appState.getSubmitTime(),
+ appState.getStartTime(), appState.getUser(), dummyContext,
+ RMAppState.FINISHED, "appDiagnostics", 1234);
store.updateApplicationState(dummyApp);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
- ApplicationAttemptState dummyAttempt =
- new ApplicationAttemptState(dummyAttemptId,
- oldAttemptState.getMasterContainer(),
- oldAttemptState.getAppAttemptCredentials(),
- oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
- "myTrackingUrl", "attemptDiagnostics",
- FinalApplicationStatus.SUCCEEDED, 111,
- oldAttemptState.getFinishTime(), 0, 0);
+ ApplicationAttemptStateData dummyAttempt =
+ ApplicationAttemptStateData.newInstance(dummyAttemptId,
+ oldAttemptState.getMasterContainer(),
+ oldAttemptState.getAppAttemptTokens(),
+ oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
+ "myTrackingUrl", "attemptDiagnostics",
+ FinalApplicationStatus.SUCCEEDED, 111,
+ oldAttemptState.getFinishTime(), 0, 0);
store.updateApplicationAttemptState(dummyAttempt);
// let things settle down
@@ -355,11 +360,13 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(dispatcher);
RMState newRMState = store.loadState();
- Map<ApplicationId, ApplicationState> newRMAppState =
+ Map<ApplicationId, ApplicationStateData> newRMAppState =
newRMState.getApplicationState();
- assertNotNull(newRMAppState.get(dummyApp.getAppId()));
- ApplicationState updatedAppState = newRMAppState.get(appId1);
- assertEquals(appState.getAppId(),updatedAppState.getAppId());
+ assertNotNull(newRMAppState.get(
+ dummyApp.getApplicationSubmissionContext().getApplicationId()));
+ ApplicationStateData updatedAppState = newRMAppState.get(appId1);
+ assertEquals(appState.getApplicationSubmissionContext().getApplicationId(),
+ updatedAppState.getApplicationSubmissionContext().getApplicationId());
assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime());
assertEquals(appState.getStartTime(), updatedAppState.getStartTime());
assertEquals(appState.getUser(), updatedAppState.getUser());
@@ -369,16 +376,17 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
assertEquals(1234, updatedAppState.getFinishTime());
// check updated attempt state
- assertNotNull(newRMAppState.get(dummyApp.getAppId()).getAttempt(
- dummyAttemptId));
- ApplicationAttemptState updatedAttemptState =
+ assertNotNull(newRMAppState.get(dummyApp.getApplicationSubmissionContext
+ ().getApplicationId()).getAttempt(dummyAttemptId));
+ ApplicationAttemptStateData updatedAttemptState =
updatedAppState.getAttempt(newAttemptState.getAttemptId());
assertEquals(oldAttemptState.getAttemptId(),
updatedAttemptState.getAttemptId());
assertEquals(containerId2, updatedAttemptState.getMasterContainer().getId());
- assertArrayEquals(clientTokenKey2.getEncoded(),
- updatedAttemptState.getAppAttemptCredentials().getSecretKey(
- RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+ assertArrayEquals(
+ clientTokenKey2.getEncoded(),
+ attemptState.getAppAttemptTokens()
+ .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
// new attempt state fields
assertEquals(RMAppAttemptState.FINISHED, updatedAttemptState.getState());
assertEquals("myTrackingUrl", updatedAttemptState.getFinalTrackingUrl());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 0a2f0d4..72f1dff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
@@ -62,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -326,11 +326,13 @@ public class TestRMAppTransitions {
}
private void assertAppFinalStateSaved(RMApp application){
- verify(store, times(1)).updateApplicationState(any(ApplicationState.class));
+ verify(store, times(1)).updateApplicationState(
+ any(ApplicationStateData.class));
}
private void assertAppFinalStateNotSaved(RMApp application){
- verify(store, times(0)).updateApplicationState(any(ApplicationState.class));
+ verify(store, times(0)).updateApplicationState(
+ any(ApplicationStateData.class));
}
private void assertKilled(RMApp application) {
@@ -395,11 +397,13 @@ public class TestRMAppTransitions {
RMApp application = createNewTestApp(submissionContext);
// NEW => SUBMITTED event RMAppEventType.RECOVER
RMState state = new RMState();
- ApplicationState appState = new ApplicationState(123, 123, null, "user");
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(123, 123, null, "user");
state.getApplicationState().put(application.getApplicationId(), appState);
RMAppEvent event =
new RMAppRecoverEvent(application.getApplicationId(), state);
+
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
@@ -946,22 +950,25 @@ public class TestRMAppTransitions {
@Test(timeout = 30000)
public void testAppsRecoveringStates() throws Exception {
RMState state = new RMState();
- Map<ApplicationId, ApplicationState> applicationState =
+ Map<ApplicationId, ApplicationStateData> applicationState =
state.getApplicationState();
createRMStateForApplications(applicationState, RMAppState.FINISHED);
createRMStateForApplications(applicationState, RMAppState.KILLED);
createRMStateForApplications(applicationState, RMAppState.FAILED);
- for (ApplicationState appState : applicationState.values()) {
+ for (ApplicationStateData appState : applicationState.values()) {
testRecoverApplication(appState, state);
}
}
- public void testRecoverApplication(ApplicationState appState, RMState rmState)
+ public void testRecoverApplication(ApplicationStateData appState,
+ RMState rmState)
throws Exception {
ApplicationSubmissionContext submissionContext =
appState.getApplicationSubmissionContext();
RMAppImpl application =
- new RMAppImpl(appState.getAppId(), rmContext, conf,
+ new RMAppImpl(
+ appState.getApplicationSubmissionContext().getApplicationId(),
+ rmContext, conf,
submissionContext.getApplicationName(), null,
submissionContext.getQueue(), submissionContext, null, null,
appState.getSubmitTime(), submissionContext.getApplicationType(),
@@ -986,12 +993,12 @@ public class TestRMAppTransitions {
}
public void createRMStateForApplications(
- Map<ApplicationId, ApplicationState> applicationState,
+ Map<ApplicationId, ApplicationStateData> applicationState,
RMAppState rmAppState) {
RMApp app = createNewTestApp(null);
- ApplicationState appState =
- new ApplicationState(app.getSubmitTime(), app.getStartTime(),
- app.getApplicationSubmissionContext(), app.getUser(), rmAppState,
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(),
+ app.getUser(), app.getApplicationSubmissionContext(), rmAppState,
null, app.getFinishTime());
applicationState.put(app.getApplicationId(), appState);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 2b5c2b8..fc653dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -81,7 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventT
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -1520,7 +1520,7 @@ public class TestRMAppAttemptTransitions {
private void verifyAttemptFinalStateSaved() {
verify(store, times(1)).updateApplicationAttemptState(
- any(ApplicationAttemptState.class));
+ any(ApplicationAttemptStateData.class));
}
private void verifyAMHostAndPortInvalidated() {
[2/2] hadoop git commit: YARN-2404. Removed ApplicationAttemptState
and ApplicationState class in RMStateStore. Contributed by Tsuyoshi OZAWA
(cherry picked from commit 5805a81efbc024024d8172489dfdc6cf77879416)
Posted by ji...@apache.org.
YARN-2404. Removed ApplicationAttemptState and ApplicationState class in RMStateStore. Contributed by Tsuyoshi OZAWA
(cherry picked from commit 5805a81efbc024024d8172489dfdc6cf77879416)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28630565
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28630565
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28630565
Branch: refs/heads/branch-2
Commit: 286305653082baf514a4e4a11318ff9cb0be0ba5
Parents: 4b62d6d
Author: Jian He <ji...@apache.org>
Authored: Tue Nov 25 12:48:22 2014 -0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Nov 25 12:51:03 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/resourcemanager/RMAppManager.java | 13 +-
.../recovery/FileSystemRMStateStore.java | 74 ++----
.../recovery/MemoryRMStateStore.java | 86 ++-----
.../recovery/NullRMStateStore.java | 2 +-
.../resourcemanager/recovery/RMStateStore.java | 232 +++----------------
.../recovery/RMStateStoreAppAttemptEvent.java | 8 +-
.../recovery/RMStateStoreAppEvent.java | 8 +-
.../recovery/RMStateStoreRemoveAppEvent.java | 8 +-
.../recovery/RMStateUpdateAppAttemptEvent.java | 9 +-
.../recovery/RMStateUpdateAppEvent.java | 8 +-
.../recovery/ZKRMStateStore.java | 46 +---
.../records/ApplicationAttemptStateData.java | 36 +--
.../recovery/records/ApplicationStateData.java | 25 +-
.../pb/ApplicationAttemptStateDataPBImpl.java | 60 ++++-
.../server/resourcemanager/rmapp/RMAppImpl.java | 16 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 25 +-
.../yarn/server/resourcemanager/TestRMHA.java | 4 +-
.../server/resourcemanager/TestRMRestart.java | 84 +++----
.../applicationsmanager/TestAMRestart.java | 8 +-
.../recovery/RMStateStoreTestBase.java | 100 ++++----
.../rmapp/TestRMAppTransitions.java | 31 ++-
.../attempt/TestRMAppAttemptTransitions.java | 4 +-
23 files changed, 353 insertions(+), 537 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 511bb6e..be8fbed 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -65,6 +65,9 @@ Release 2.7.0 - UNRELEASED
YARN-2669. FairScheduler: queue names shouldn't allow periods
(Wei Yan via Sandy Ryza)
+ YARN-2404. Removed ApplicationAttemptState and ApplicationState class in
+ RMStateStore. (Tsuyoshi OZAWA via jianhe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index ab8df62..f38e128 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -40,9 +40,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -306,11 +306,11 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
}
- protected void recoverApplication(ApplicationState appState, RMState rmState)
- throws Exception {
+ protected void recoverApplication(ApplicationStateData appState,
+ RMState rmState) throws Exception {
ApplicationSubmissionContext appContext =
appState.getApplicationSubmissionContext();
- ApplicationId appId = appState.getAppId();
+ ApplicationId appId = appContext.getApplicationId();
// create and recover app.
RMAppImpl application =
@@ -414,9 +414,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
RMStateStore store = rmContext.getStateStore();
assert store != null;
// recover applications
- Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
+ Map<ApplicationId, ApplicationStateData> appStates =
+ state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
- for (ApplicationState appState : appStates.values()) {
+ for (ApplicationStateData appState : appStates.values()) {
recoverApplication(appState, state);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 2bbc5c2..2996392 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -38,9 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -223,8 +221,8 @@ public class FileSystemRMStateStore extends RMStateStore {
private void loadRMAppState(RMState rmState) throws Exception {
try {
- List<ApplicationAttemptState> attempts =
- new ArrayList<ApplicationAttemptState>();
+ List<ApplicationAttemptStateData> attempts =
+ new ArrayList<ApplicationAttemptStateData>();
for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
checkAndResumeUpdateOperation(appDir.getPath());
@@ -241,19 +239,11 @@ public class FileSystemRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from node: " + childNodeName);
}
- ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
- ApplicationStateDataPBImpl appStateData =
+ ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
- ApplicationState appState =
- new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getStartTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser(),
- appStateData.getState(),
- appStateData.getDiagnostics(), appStateData.getFinishTime());
- // assert child node name is same as actual applicationId
- assert appId.equals(appState.context.getApplicationId());
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
rmState.appState.put(appId, appState);
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
@@ -262,33 +252,9 @@ public class FileSystemRMStateStore extends RMStateStore {
LOG.debug("Loading application attempt from node: "
+ childNodeName);
}
- ApplicationAttemptId attemptId =
- ConverterUtils.toApplicationAttemptId(childNodeName);
- ApplicationAttemptStateDataPBImpl attemptStateData =
+ ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
- Credentials credentials = null;
- if (attemptStateData.getAppAttemptTokens() != null) {
- credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
- }
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(),
- attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
- attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus(),
- attemptStateData.getFinishTime(),
- attemptStateData.getMemorySeconds(),
- attemptStateData.getVcoreSeconds());
-
- // assert child node name is same as application attempt id
- assert attemptId.equals(attemptState.getAttemptId());
attempts.add(attemptState);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
@@ -299,9 +265,9 @@ public class FileSystemRMStateStore extends RMStateStore {
// go through all attempts and add them to their apps, Ideally, each
// attempt node must have a corresponding app node, because remove
// directory operation remove both at the same time
- for (ApplicationAttemptState attemptState : attempts) {
+ for (ApplicationAttemptStateData attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
- ApplicationState appState = rmState.appState.get(appId);
+ ApplicationStateData appState = rmState.appState.get(appId);
assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@@ -398,10 +364,9 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
- String appIdStr = appId.toString();
- Path appDirPath = getAppDir(rmAppRoot, appIdStr);
+ Path appDirPath = getAppDir(rmAppRoot, appId);
fs.mkdirs(appDirPath);
- Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
+ Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -418,9 +383,8 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
- String appIdStr = appId.toString();
- Path appDirPath = getAppDir(rmAppRoot, appIdStr);
- Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
+ Path appDirPath = getAppDir(rmAppRoot, appId);
+ Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -440,7 +404,7 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
- getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId());
Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
@@ -461,7 +425,7 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
- getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId());
Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Updating info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
@@ -477,9 +441,11 @@ public class FileSystemRMStateStore extends RMStateStore {
}
@Override
- public synchronized void removeApplicationStateInternal(ApplicationState appState)
+ public synchronized void removeApplicationStateInternal(
+ ApplicationStateData appState)
throws Exception {
- String appId = appState.getAppId().toString();
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
@@ -572,8 +538,8 @@ public class FileSystemRMStateStore extends RMStateStore {
}
}
- private Path getAppDir(Path root, String appId) {
- return getNodePath(root, appId);
+ private Path getAppDir(Path root, ApplicationId appId) {
+ return getNodePath(root, appId.toString());
}
// FileSystem related code
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index a67da2c..917fdc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -25,8 +25,6 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -93,57 +91,30 @@ public class MemoryRMStateStore extends RMStateStore {
}
@Override
- public void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateData appStateData)
+ public void storeApplicationStateInternal(
+ ApplicationId appId, ApplicationStateData appState)
throws Exception {
- ApplicationState appState =
- new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getStartTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser());
state.appState.put(appId, appState);
}
@Override
public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateData appStateData) throws Exception {
- ApplicationState updatedAppState =
- new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getStartTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser(), appStateData.getState(),
- appStateData.getDiagnostics(), appStateData.getFinishTime());
- LOG.info("Updating final state " + appStateData.getState() + " for app: "
+ ApplicationStateData appState) throws Exception {
+ LOG.info("Updating final state " + appState.getState() + " for app: "
+ appId);
if (state.appState.get(appId) != null) {
// add the earlier attempts back
- updatedAppState.attempts
- .putAll(state.appState.get(appId).attempts);
+ appState.attempts.putAll(state.appState.get(appId).attempts);
}
- state.appState.put(appId, updatedAppState);
+ state.appState.put(appId, appState);
}
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateData attemptStateData)
+ ApplicationAttemptStateData attemptState)
throws Exception {
- Credentials credentials = null;
- if(attemptStateData.getAppAttemptTokens() != null){
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- credentials = new Credentials();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
- }
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(appAttemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(),
- attemptStateData.getMemorySeconds(),
- attemptStateData.getVcoreSeconds());
-
-
- ApplicationState appState = state.getApplicationState().get(
+ ApplicationStateData appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId());
if (appState == null) {
throw new YarnRuntimeException("Application doesn't exist");
@@ -154,44 +125,25 @@ public class MemoryRMStateStore extends RMStateStore {
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateData attemptStateData)
+ ApplicationAttemptStateData attemptState)
throws Exception {
- Credentials credentials = null;
- if (attemptStateData.getAppAttemptTokens() != null) {
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- credentials = new Credentials();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
- }
- ApplicationAttemptState updatedAttemptState =
- new ApplicationAttemptState(appAttemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
- attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus(),
- attemptStateData.getFinishTime(),
- attemptStateData.getMemorySeconds(),
- attemptStateData.getVcoreSeconds());
-
- ApplicationState appState =
- state.getApplicationState().get(
- updatedAttemptState.getAttemptId().getApplicationId());
+ ApplicationStateData appState =
+ state.getApplicationState().get(appAttemptId.getApplicationId());
if (appState == null) {
throw new YarnRuntimeException("Application doesn't exist");
}
- LOG.info("Updating final state " + updatedAttemptState.getState()
- + " for attempt: " + updatedAttemptState.getAttemptId());
- appState.attempts.put(updatedAttemptState.getAttemptId(),
- updatedAttemptState);
+ LOG.info("Updating final state " + attemptState.getState()
+ + " for attempt: " + attemptState.getAttemptId());
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@Override
public synchronized void removeApplicationStateInternal(
- ApplicationState appState) throws Exception {
- ApplicationId appId = appState.getAppId();
- ApplicationState removed = state.appState.remove(appId);
+ ApplicationStateData appState) throws Exception {
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
+ ApplicationStateData removed = state.appState.remove(appId);
+
if (removed == null) {
throw new YarnRuntimeException("Removing non-exsisting application state");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
index b957d12..f80c497 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
@@ -71,7 +71,7 @@ public class NullRMStateStore extends RMStateStore {
}
@Override
- protected void removeApplicationStateInternal(ApplicationState appState)
+ protected void removeApplicationStateInternal(ApplicationStateData appState)
throws Exception {
// Do nothing
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 8948b54..35a54c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -38,9 +38,6 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -56,12 +53,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
@@ -129,13 +124,13 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
- ApplicationId appId = appState.getAppId();
- ApplicationStateData appStateData = ApplicationStateData
- .newInstance(appState);
+ ApplicationStateData appState =
+ ((RMStateStoreAppEvent) event).getAppState();
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Storing info for app: " + appId);
try {
- store.storeApplicationStateInternal(appId, appStateData);
+ store.storeApplicationStateInternal(appId, appState);
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
@@ -154,13 +149,13 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
- ApplicationId appId = appState.getAppId();
- ApplicationStateData appStateData = ApplicationStateData
- .newInstance(appState);
+ ApplicationStateData appState =
+ ((RMStateUpdateAppEvent) event).getAppState();
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Updating info for app: " + appId);
try {
- store.updateApplicationStateInternal(appId, appStateData);
+ store.updateApplicationStateInternal(appId, appState);
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_UPDATE_SAVED));
} catch (Exception e) {
@@ -179,9 +174,10 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
- .getAppState();
- ApplicationId appId = appState.getAppId();
+ ApplicationStateData appState =
+ ((RMStateStoreRemoveAppEvent) event).getAppState();
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Removing info for app: " + appId);
try {
store.removeApplicationStateInternal(appState);
@@ -201,16 +197,14 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationAttemptState attemptState =
+ ApplicationAttemptStateData attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
try {
- ApplicationAttemptStateData attemptStateData =
- ApplicationAttemptStateData.newInstance(attemptState);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
- attemptStateData);
+ attemptState);
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
@@ -230,16 +224,14 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationAttemptState attemptState =
+ ApplicationAttemptStateData attemptState =
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
try {
- ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
- .newInstance(attemptState);
if (LOG.isDebugEnabled()) {
LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
}
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
- attemptStateData);
+ attemptState);
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
@@ -255,153 +247,6 @@ public abstract class RMStateStore extends AbstractService {
stateMachine = stateMachineFactory.make(this);
}
- /**
- * State of an application attempt
- */
- public static class ApplicationAttemptState {
- final ApplicationAttemptId attemptId;
- final Container masterContainer;
- final Credentials appAttemptCredentials;
- long startTime = 0;
- long finishTime = 0;
- // fields set when attempt completes
- RMAppAttemptState state;
- String finalTrackingUrl = "N/A";
- String diagnostics;
- int exitStatus = ContainerExitStatus.INVALID;
- FinalApplicationStatus amUnregisteredFinalStatus;
- long memorySeconds;
- long vcoreSeconds;
-
- public ApplicationAttemptState(ApplicationAttemptId attemptId,
- Container masterContainer, Credentials appAttemptCredentials,
- long startTime, long memorySeconds, long vcoreSeconds) {
- this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
- null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, vcoreSeconds);
- }
-
- public ApplicationAttemptState(ApplicationAttemptId attemptId,
- Container masterContainer, Credentials appAttemptCredentials,
- long startTime, RMAppAttemptState state, String finalTrackingUrl,
- String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
- int exitStatus, long finishTime, long memorySeconds,
- long vcoreSeconds) {
- this.attemptId = attemptId;
- this.masterContainer = masterContainer;
- this.appAttemptCredentials = appAttemptCredentials;
- this.startTime = startTime;
- this.state = state;
- this.finalTrackingUrl = finalTrackingUrl;
- this.diagnostics = diagnostics == null ? "" : diagnostics;
- this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
- this.exitStatus = exitStatus;
- this.finishTime = finishTime;
- this.memorySeconds = memorySeconds;
- this.vcoreSeconds = vcoreSeconds;
- }
-
- public Container getMasterContainer() {
- return masterContainer;
- }
- public ApplicationAttemptId getAttemptId() {
- return attemptId;
- }
- public Credentials getAppAttemptCredentials() {
- return appAttemptCredentials;
- }
- public RMAppAttemptState getState(){
- return state;
- }
- public String getFinalTrackingUrl() {
- return finalTrackingUrl;
- }
- public String getDiagnostics() {
- return diagnostics;
- }
- public long getStartTime() {
- return startTime;
- }
- public FinalApplicationStatus getFinalApplicationStatus() {
- return amUnregisteredFinalStatus;
- }
- public int getAMContainerExitStatus(){
- return this.exitStatus;
- }
- public long getMemorySeconds() {
- return memorySeconds;
- }
- public long getVcoreSeconds() {
- return vcoreSeconds;
- }
- public long getFinishTime() {
- return this.finishTime;
- }
- }
-
- /**
- * State of an application application
- */
- public static class ApplicationState {
- final ApplicationSubmissionContext context;
- final long submitTime;
- final long startTime;
- final String user;
- Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
- new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
- // fields set when application completes.
- RMAppState state;
- String diagnostics;
- long finishTime;
-
- public ApplicationState(long submitTime,
- long startTime, ApplicationSubmissionContext context, String user) {
- this(submitTime, startTime, context, user, null, "", 0);
- }
-
- public ApplicationState(long submitTime,
- long startTime,ApplicationSubmissionContext context,
- String user, RMAppState state, String diagnostics, long finishTime) {
- this.submitTime = submitTime;
- this.startTime = startTime;
- this.context = context;
- this.user = user;
- this.state = state;
- this.diagnostics = diagnostics == null ? "" : diagnostics;
- this.finishTime = finishTime;
- }
-
- public ApplicationId getAppId() {
- return context.getApplicationId();
- }
- public long getSubmitTime() {
- return submitTime;
- }
- public long getStartTime() {
- return startTime;
- }
- public int getAttemptCount() {
- return attempts.size();
- }
- public ApplicationSubmissionContext getApplicationSubmissionContext() {
- return context;
- }
- public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
- return attempts.get(attemptId);
- }
- public String getUser() {
- return user;
- }
- public RMAppState getState() {
- return state;
- }
- public String getDiagnostics() {
- return diagnostics;
- }
- public long getFinishTime() {
- return finishTime;
- }
- }
-
public static class RMDTSecretManagerState {
// DTIdentifier -> renewDate
Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
@@ -429,14 +274,14 @@ public abstract class RMStateStore extends AbstractService {
* State of the ResourceManager
*/
public static class RMState {
- Map<ApplicationId, ApplicationState> appState =
- new TreeMap<ApplicationId, ApplicationState>();
+ Map<ApplicationId, ApplicationStateData> appState =
+ new TreeMap<ApplicationId, ApplicationStateData>();
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
- public Map<ApplicationId, ApplicationState> getApplicationState() {
+ public Map<ApplicationId, ApplicationStateData> getApplicationState() {
return appState;
}
@@ -575,14 +420,15 @@ public abstract class RMStateStore extends AbstractService {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
- ApplicationState appState =
- new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
- app.getUser());
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(
+ app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
@SuppressWarnings("unchecked")
- public synchronized void updateApplicationState(ApplicationState appState) {
+ public synchronized void updateApplicationState(
+ ApplicationStateData appState) {
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
}
@@ -609,11 +455,13 @@ public abstract class RMStateStore extends AbstractService {
AggregateAppResourceUsage resUsage =
appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(appAttempt.getAppAttemptId(),
- appAttempt.getMasterContainer(), credentials,
- appAttempt.getStartTime(), resUsage.getMemorySeconds(),
- resUsage.getVcoreSeconds());
+ ApplicationAttemptStateData attemptState =
+ ApplicationAttemptStateData.newInstance(
+ appAttempt.getAppAttemptId(),
+ appAttempt.getMasterContainer(),
+ credentials, appAttempt.getStartTime(),
+ resUsage.getMemorySeconds(),
+ resUsage.getVcoreSeconds());
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
@@ -621,7 +469,7 @@ public abstract class RMStateStore extends AbstractService {
@SuppressWarnings("unchecked")
public synchronized void updateApplicationAttemptState(
- ApplicationAttemptState attemptState) {
+ ApplicationAttemptStateData attemptState) {
dispatcher.getEventHandler().handle(
new RMStateUpdateAppAttemptEvent(attemptState));
}
@@ -761,16 +609,12 @@ public abstract class RMStateStore extends AbstractService {
*/
@SuppressWarnings("unchecked")
public synchronized void removeApplication(RMApp app) {
- ApplicationState appState = new ApplicationState(
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(),
app.getApplicationSubmissionContext(), app.getUser());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
- Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(appAttempt.getAppAttemptId(),
- appAttempt.getMasterContainer(), credentials,
- appAttempt.getStartTime(), 0, 0);
- appState.attempts.put(attemptState.getAttemptId(), attemptState);
+ appState.attempts.put(appAttempt.getAppAttemptId(), null);
}
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
@@ -782,7 +626,7 @@ public abstract class RMStateStore extends AbstractService {
* application and its attempts
*/
protected abstract void removeApplicationStateInternal(
- ApplicationState appState) throws Exception;
+ ApplicationStateData appState) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-1779
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
index c4a04bc..3399431 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
@@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent {
- ApplicationAttemptState attemptState;
+ ApplicationAttemptStateData attemptState;
- public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) {
+ public RMStateStoreAppAttemptEvent(ApplicationAttemptStateData attemptState) {
super(RMStateStoreEventType.STORE_APP_ATTEMPT);
this.attemptState = attemptState;
}
- public ApplicationAttemptState getAppAttemptState() {
+ public ApplicationAttemptStateData getAppAttemptState() {
return attemptState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
index 99f8e37..50e59f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
@@ -18,18 +18,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateStoreAppEvent extends RMStateStoreEvent {
- private final ApplicationState appState;
+ private final ApplicationStateData appState;
- public RMStateStoreAppEvent(ApplicationState appState) {
+ public RMStateStoreAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.STORE_APP);
this.appState = appState;
}
- public ApplicationState getAppState() {
+ public ApplicationStateData getAppState() {
return appState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
index 402feb9..fbba64c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
@@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent {
- ApplicationState appState;
+ ApplicationStateData appState;
- RMStateStoreRemoveAppEvent(ApplicationState appState) {
+ RMStateStoreRemoveAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.REMOVE_APP);
this.appState = appState;
}
- public ApplicationState getAppState() {
+ public ApplicationStateData getAppState() {
return appState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java
index 9ded673..14f8e9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java
@@ -18,18 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class RMStateUpdateAppAttemptEvent extends RMStateStoreEvent {
- ApplicationAttemptState attemptState;
+ ApplicationAttemptStateData attemptState;
- public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState) {
+ public RMStateUpdateAppAttemptEvent(
+ ApplicationAttemptStateData attemptState) {
super(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
this.attemptState = attemptState;
}
- public ApplicationAttemptState getAppAttemptState() {
+ public ApplicationAttemptStateData getAppAttemptState() {
return attemptState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java
index 9bb96e5..cec364c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java
@@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateUpdateAppEvent extends RMStateStoreEvent {
- private final ApplicationState appState;
+ private final ApplicationStateData appState;
- public RMStateUpdateAppEvent(ApplicationState appState) {
+ public RMStateUpdateAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.UPDATE_APP);
this.appState = appState;
}
- public ApplicationState getAppState() {
+ public ApplicationStateData getAppState() {
return appState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index ab048ca..a19ed30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -34,8 +34,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
@@ -562,17 +560,11 @@ public class ZKRMStateStore extends RMStateStore {
LOG.debug("Loading application from znode: " + childNodeName);
}
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
- ApplicationStateDataPBImpl appStateData =
+ ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
- ApplicationState appState =
- new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getStartTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser(),
- appStateData.getState(),
- appStateData.getDiagnostics(), appStateData.getFinishTime());
- if (!appId.equals(appState.context.getApplicationId())) {
+ if (!appId.equals(
+ appState.getApplicationSubmissionContext().getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application id");
}
@@ -584,7 +576,7 @@ public class ZKRMStateStore extends RMStateStore {
}
}
- private void loadApplicationAttemptState(ApplicationState appState,
+ private void loadApplicationAttemptState(ApplicationStateData appState,
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
@@ -594,31 +586,9 @@ public class ZKRMStateStore extends RMStateStore {
String attemptPath = getNodePath(appPath, attemptIDStr);
byte[] attemptData = getDataWithRetries(attemptPath, true);
- ApplicationAttemptId attemptId =
- ConverterUtils.toApplicationAttemptId(attemptIDStr);
- ApplicationAttemptStateDataPBImpl attemptStateData =
+ ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(attemptData));
- Credentials credentials = null;
- if (attemptStateData.getAppAttemptTokens() != null) {
- credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
- }
-
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
- attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus(),
- attemptStateData.getFinishTime(),
- attemptStateData.getMemorySeconds(),
- attemptStateData.getVcoreSeconds());
-
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@@ -705,9 +675,11 @@ public class ZKRMStateStore extends RMStateStore {
}
@Override
- public synchronized void removeApplicationStateInternal(ApplicationState appState)
+ public synchronized void removeApplicationStateInternal(
+ ApplicationStateData appState)
throws Exception {
- String appId = appState.getAppId().toString();
+ String appId = appState.getApplicationSubmissionContext().getApplicationId()
+ .toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
index 63ef8f6..391783b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
@@ -18,18 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
@@ -41,7 +37,7 @@ import org.apache.hadoop.yarn.util.Records;
public abstract class ApplicationAttemptStateData {
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
- ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+ Credentials attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long finishTime, long memorySeconds, long vcoreSeconds) {
@@ -52,7 +48,7 @@ public abstract class ApplicationAttemptStateData {
attemptStateData.setAppAttemptTokens(attemptTokens);
attemptStateData.setState(finalState);
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
- attemptStateData.setDiagnostics(diagnostics);
+ attemptStateData.setDiagnostics(diagnostics == null ? "" : diagnostics);
attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
@@ -63,22 +59,14 @@ public abstract class ApplicationAttemptStateData {
}
public static ApplicationAttemptStateData newInstance(
- ApplicationAttemptState attemptState) throws IOException {
- Credentials credentials = attemptState.getAppAttemptCredentials();
- ByteBuffer appAttemptTokens = null;
- if (credentials != null) {
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ ApplicationAttemptId attemptId, Container masterContainer,
+ Credentials attemptTokens, long startTime, long memorySeconds,
+ long vcoreSeconds) {
+ return newInstance(attemptId, masterContainer, attemptTokens,
+ startTime, null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
+ memorySeconds, vcoreSeconds);
}
- return newInstance(attemptState.getAttemptId(),
- attemptState.getMasterContainer(), appAttemptTokens,
- attemptState.getStartTime(), attemptState.getState(),
- attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
- attemptState.getFinalApplicationStatus(),
- attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(),
- attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
- }
+
public abstract ApplicationAttemptStateDataProto getProto();
@@ -108,9 +96,9 @@ public abstract class ApplicationAttemptStateData {
*/
@Public
@Unstable
- public abstract ByteBuffer getAppAttemptTokens();
+ public abstract Credentials getAppAttemptTokens();
- public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
+ public abstract void setAppAttemptTokens(Credentials attemptTokens);
/**
* Get the final state of the application attempt.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
index eff0445..43046a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
@@ -18,14 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.Records;
@@ -36,6 +38,9 @@ import org.apache.hadoop.yarn.util.Records;
@Public
@Unstable
public abstract class ApplicationStateData {
+ public Map<ApplicationAttemptId, ApplicationAttemptStateData> attempts =
+ new HashMap<ApplicationAttemptId, ApplicationAttemptStateData>();
+
public static ApplicationStateData newInstance(long submitTime,
long startTime, String user,
ApplicationSubmissionContext submissionContext,
@@ -51,12 +56,18 @@ public abstract class ApplicationStateData {
return appState;
}
- public static ApplicationStateData newInstance(
- ApplicationState appState) {
- return newInstance(appState.getSubmitTime(), appState.getStartTime(),
- appState.getUser(), appState.getApplicationSubmissionContext(),
- appState.getState(), appState.getDiagnostics(),
- appState.getFinishTime());
+ public static ApplicationStateData newInstance(long submitTime,
+ long startTime, ApplicationSubmissionContext context, String user) {
+ return newInstance(submitTime, startTime, user, context, null, "", 0);
+ }
+
+ public int getAttemptCount() {
+ return attempts.size();
+ }
+
+ public ApplicationAttemptStateData getAttempt(
+ ApplicationAttemptId attemptId) {
+ return attempts.get(attemptId);
}
public abstract ApplicationStateDataProto getProto();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
index 516af2d..bae3f9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
@@ -18,8 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -37,6 +44,8 @@ import com.google.protobuf.TextFormat;
public class ApplicationAttemptStateDataPBImpl extends
ApplicationAttemptStateData {
+ private static Log LOG =
+ LogFactory.getLog(ApplicationAttemptStateDataPBImpl.class);
ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null;
@@ -137,26 +146,27 @@ public class ApplicationAttemptStateDataPBImpl extends
}
@Override
- public ByteBuffer getAppAttemptTokens() {
+ public Credentials getAppAttemptTokens() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(appAttemptTokens != null) {
- return appAttemptTokens;
+ return convertCredentialsFromByteBuffer(appAttemptTokens);
}
if(!p.hasAppAttemptTokens()) {
return null;
}
this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
p.getAppAttemptTokens());
- return appAttemptTokens;
+ return convertCredentialsFromByteBuffer(appAttemptTokens);
}
@Override
- public void setAppAttemptTokens(ByteBuffer attemptTokens) {
+ public void setAppAttemptTokens(Credentials attemptTokens) {
maybeInitBuilder();
if(attemptTokens == null) {
builder.clearAppAttemptTokens();
+ return;
}
- this.appAttemptTokens = attemptTokens;
+ this.appAttemptTokens = convertCredentialsToByteBuffer(attemptTokens);
}
@Override
@@ -330,4 +340,44 @@ public class ApplicationAttemptStateDataPBImpl extends
maybeInitBuilder();
builder.setFinishTime(finishTime);
}
+
+ private static ByteBuffer convertCredentialsToByteBuffer(
+ Credentials credentials) {
+ ByteBuffer appAttemptTokens = null;
+ DataOutputBuffer dob = new DataOutputBuffer();
+ try {
+ if (credentials != null) {
+ credentials.writeTokenStorageToStream(dob);
+ appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
+ return appAttemptTokens;
+ } catch (IOException e) {
+ LOG.error("Failed to convert Credentials to ByteBuffer.");
+ assert false;
+ return null;
+ } finally {
+ IOUtils.closeStream(dob);
+ }
+ }
+
+ private static Credentials convertCredentialsFromByteBuffer(
+ ByteBuffer appAttemptTokens) {
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ try {
+ Credentials credentials = null;
+ if (appAttemptTokens != null) {
+ credentials = new Credentials();
+ appAttemptTokens.rewind();
+ dibb.reset(appAttemptTokens);
+ credentials.readTokenStorageStream(dibb);
+ }
+ return credentials;
+ } catch (IOException e) {
+ LOG.error("Failed to convert Credentials from ByteBuffer.");
+ assert false;
+ return null;
+ } finally {
+ IOUtils.closeStream(dibb);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 751dbe4..33b62fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -66,9 +66,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -728,10 +728,12 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public void recover(RMState state) {
- ApplicationState appState = state.getApplicationState().get(getApplicationId());
+ ApplicationStateData appState =
+ state.getApplicationState().get(getApplicationId());
this.recoveredFinalState = appState.getState();
LOG.info("Recovering app: " + getApplicationId() + " with " +
- + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState );
+ + appState.getAttemptCount() + " attempts and final state = "
+ + this.recoveredFinalState );
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
@@ -1019,10 +1021,10 @@ public class RMAppImpl implements RMApp, Recoverable {
default:
break;
}
- ApplicationState appState =
- new ApplicationState(this.submitTime, this.startTime,
- this.submissionContext, this.user, stateToBeStored, diags,
- this.storedFinishTime);
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(this.submitTime, this.startTime,
+ this.user, this.submissionContext,
+ stateToBeStored, diags, this.storedFinishTime);
this.rmContext.getStateStore().updateApplicationState(appState);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index a80167f..4c52d29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
-import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -70,9 +69,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -793,9 +792,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override
public void recover(RMState state) {
- ApplicationState appState =
+ ApplicationStateData appState =
state.getApplicationState().get(getAppAttemptId().getApplicationId());
- ApplicationAttemptState attemptState =
+ ApplicationAttemptStateData attemptState =
appState.getAttempt(getAppAttemptId());
assert attemptState != null;
LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: "
@@ -806,9 +805,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
this.attemptMetrics.setIsPreempted();
}
+
+ Credentials credentials = attemptState.getAppAttemptTokens();
setMasterContainer(attemptState.getMasterContainer());
- recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
- attemptState.getState());
+ recoverAppAttemptCredentials(credentials, attemptState.getState());
this.recoveredFinalState = attemptState.getState();
this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -1123,10 +1123,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.attemptMetrics.getAggregateAppResourceUsage();
RMStateStore rmStore = rmContext.getStateStore();
setFinishTime(System.currentTimeMillis());
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
- rmStore.getCredentialsFromAppAttempt(this), startTime,
- stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
+
+ ApplicationAttemptStateData attemptState =
+ ApplicationAttemptStateData.newInstance(
+ applicationAttemptId, getMasterContainer(),
+ rmStore.getCredentialsFromAppAttempt(this),
+ startTime, stateToBeStored, finalTrackingUrl, diags,
+ finalStatus, exitStatus,
getFinishTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
LOG.info("Updating application attempt " + applicationAttemptId
http://git-wip-us.apache.org/repos/asf/hadoop/blob/28630565/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 122eb60..0200e85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -459,7 +460,8 @@ public class TestRMHA {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
- public synchronized void updateApplicationState(ApplicationState appState) {
+ public synchronized void updateApplicationState(
+ ApplicationStateData appState) {
notifyStoreOperationFailed(new StoreFencedException());
}
};