You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/11/25 21:50:43 UTC
[2/2] hadoop git commit: YARN-2404. Removed ApplicationAttemptState
and ApplicationState class in RMStateStore. Contributed by Tsuyoshi OZAWA
YARN-2404. Removed ApplicationAttemptState and ApplicationState class in RMStateStore. Contributed by Tsuyoshi OZAWA
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5805a81e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5805a81e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5805a81e
Branch: refs/heads/trunk
Commit: 5805a81efbc024024d8172489dfdc6cf77879416
Parents: 61a2510
Author: Jian He <ji...@apache.org>
Authored: Tue Nov 25 12:48:22 2014 -0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Nov 25 12:48:22 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/resourcemanager/RMAppManager.java | 13 +-
.../recovery/FileSystemRMStateStore.java | 74 ++----
.../recovery/MemoryRMStateStore.java | 86 ++-----
.../recovery/NullRMStateStore.java | 2 +-
.../resourcemanager/recovery/RMStateStore.java | 232 +++----------------
.../recovery/RMStateStoreAppAttemptEvent.java | 8 +-
.../recovery/RMStateStoreAppEvent.java | 8 +-
.../recovery/RMStateStoreRemoveAppEvent.java | 8 +-
.../recovery/RMStateUpdateAppAttemptEvent.java | 9 +-
.../recovery/RMStateUpdateAppEvent.java | 8 +-
.../recovery/ZKRMStateStore.java | 46 +---
.../records/ApplicationAttemptStateData.java | 36 +--
.../recovery/records/ApplicationStateData.java | 25 +-
.../pb/ApplicationAttemptStateDataPBImpl.java | 60 ++++-
.../server/resourcemanager/rmapp/RMAppImpl.java | 16 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 25 +-
.../yarn/server/resourcemanager/TestRMHA.java | 4 +-
.../server/resourcemanager/TestRMRestart.java | 84 +++----
.../applicationsmanager/TestAMRestart.java | 8 +-
.../recovery/RMStateStoreTestBase.java | 100 ++++----
.../rmapp/TestRMAppTransitions.java | 31 ++-
.../attempt/TestRMAppAttemptTransitions.java | 4 +-
23 files changed, 353 insertions(+), 537 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e528c67..a19ba29 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -92,6 +92,9 @@ Release 2.7.0 - UNRELEASED
YARN-2669. FairScheduler: queue names shouldn't allow periods
(Wei Yan via Sandy Ryza)
+ YARN-2404. Removed ApplicationAttemptState and ApplicationState class in
+ RMStateStore. (Tsuyoshi OZAWA via jianhe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index ab8df62..f38e128 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -40,9 +40,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -306,11 +306,11 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
}
- protected void recoverApplication(ApplicationState appState, RMState rmState)
- throws Exception {
+ protected void recoverApplication(ApplicationStateData appState,
+ RMState rmState) throws Exception {
ApplicationSubmissionContext appContext =
appState.getApplicationSubmissionContext();
- ApplicationId appId = appState.getAppId();
+ ApplicationId appId = appContext.getApplicationId();
// create and recover app.
RMAppImpl application =
@@ -414,9 +414,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
RMStateStore store = rmContext.getStateStore();
assert store != null;
// recover applications
- Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
+ Map<ApplicationId, ApplicationStateData> appStates =
+ state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
- for (ApplicationState appState : appStates.values()) {
+ for (ApplicationStateData appState : appStates.values()) {
recoverApplication(appState, state);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 2bbc5c2..2996392 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -38,9 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -223,8 +221,8 @@ public class FileSystemRMStateStore extends RMStateStore {
private void loadRMAppState(RMState rmState) throws Exception {
try {
- List<ApplicationAttemptState> attempts =
- new ArrayList<ApplicationAttemptState>();
+ List<ApplicationAttemptStateData> attempts =
+ new ArrayList<ApplicationAttemptStateData>();
for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
checkAndResumeUpdateOperation(appDir.getPath());
@@ -241,19 +239,11 @@ public class FileSystemRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from node: " + childNodeName);
}
- ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
- ApplicationStateDataPBImpl appStateData =
+ ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
- ApplicationState appState =
- new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getStartTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser(),
- appStateData.getState(),
- appStateData.getDiagnostics(), appStateData.getFinishTime());
- // assert child node name is same as actual applicationId
- assert appId.equals(appState.context.getApplicationId());
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
rmState.appState.put(appId, appState);
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
@@ -262,33 +252,9 @@ public class FileSystemRMStateStore extends RMStateStore {
LOG.debug("Loading application attempt from node: "
+ childNodeName);
}
- ApplicationAttemptId attemptId =
- ConverterUtils.toApplicationAttemptId(childNodeName);
- ApplicationAttemptStateDataPBImpl attemptStateData =
+ ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
- Credentials credentials = null;
- if (attemptStateData.getAppAttemptTokens() != null) {
- credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
- }
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(),
- attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
- attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus(),
- attemptStateData.getFinishTime(),
- attemptStateData.getMemorySeconds(),
- attemptStateData.getVcoreSeconds());
-
- // assert child node name is same as application attempt id
- assert attemptId.equals(attemptState.getAttemptId());
attempts.add(attemptState);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
@@ -299,9 +265,9 @@ public class FileSystemRMStateStore extends RMStateStore {
// go through all attempts and add them to their apps, Ideally, each
// attempt node must have a corresponding app node, because remove
// directory operation remove both at the same time
- for (ApplicationAttemptState attemptState : attempts) {
+ for (ApplicationAttemptStateData attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
- ApplicationState appState = rmState.appState.get(appId);
+ ApplicationStateData appState = rmState.appState.get(appId);
assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@@ -398,10 +364,9 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
- String appIdStr = appId.toString();
- Path appDirPath = getAppDir(rmAppRoot, appIdStr);
+ Path appDirPath = getAppDir(rmAppRoot, appId);
fs.mkdirs(appDirPath);
- Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
+ Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -418,9 +383,8 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
- String appIdStr = appId.toString();
- Path appDirPath = getAppDir(rmAppRoot, appIdStr);
- Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
+ Path appDirPath = getAppDir(rmAppRoot, appId);
+ Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -440,7 +404,7 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
- getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId());
Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
@@ -461,7 +425,7 @@ public class FileSystemRMStateStore extends RMStateStore {
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
Path appDirPath =
- getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId());
Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Updating info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
@@ -477,9 +441,11 @@ public class FileSystemRMStateStore extends RMStateStore {
}
@Override
- public synchronized void removeApplicationStateInternal(ApplicationState appState)
+ public synchronized void removeApplicationStateInternal(
+ ApplicationStateData appState)
throws Exception {
- String appId = appState.getAppId().toString();
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
@@ -572,8 +538,8 @@ public class FileSystemRMStateStore extends RMStateStore {
}
}
- private Path getAppDir(Path root, String appId) {
- return getNodePath(root, appId);
+ private Path getAppDir(Path root, ApplicationId appId) {
+ return getNodePath(root, appId.toString());
}
// FileSystem related code
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index a67da2c..917fdc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -25,8 +25,6 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -93,57 +91,30 @@ public class MemoryRMStateStore extends RMStateStore {
}
@Override
- public void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateData appStateData)
+ public void storeApplicationStateInternal(
+ ApplicationId appId, ApplicationStateData appState)
throws Exception {
- ApplicationState appState =
- new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getStartTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser());
state.appState.put(appId, appState);
}
@Override
public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateData appStateData) throws Exception {
- ApplicationState updatedAppState =
- new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getStartTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser(), appStateData.getState(),
- appStateData.getDiagnostics(), appStateData.getFinishTime());
- LOG.info("Updating final state " + appStateData.getState() + " for app: "
+ ApplicationStateData appState) throws Exception {
+ LOG.info("Updating final state " + appState.getState() + " for app: "
+ appId);
if (state.appState.get(appId) != null) {
// add the earlier attempts back
- updatedAppState.attempts
- .putAll(state.appState.get(appId).attempts);
+ appState.attempts.putAll(state.appState.get(appId).attempts);
}
- state.appState.put(appId, updatedAppState);
+ state.appState.put(appId, appState);
}
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateData attemptStateData)
+ ApplicationAttemptStateData attemptState)
throws Exception {
- Credentials credentials = null;
- if(attemptStateData.getAppAttemptTokens() != null){
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- credentials = new Credentials();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
- }
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(appAttemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(),
- attemptStateData.getMemorySeconds(),
- attemptStateData.getVcoreSeconds());
-
-
- ApplicationState appState = state.getApplicationState().get(
+ ApplicationStateData appState = state.getApplicationState().get(
attemptState.getAttemptId().getApplicationId());
if (appState == null) {
throw new YarnRuntimeException("Application doesn't exist");
@@ -154,44 +125,25 @@ public class MemoryRMStateStore extends RMStateStore {
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateData attemptStateData)
+ ApplicationAttemptStateData attemptState)
throws Exception {
- Credentials credentials = null;
- if (attemptStateData.getAppAttemptTokens() != null) {
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- credentials = new Credentials();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
- }
- ApplicationAttemptState updatedAttemptState =
- new ApplicationAttemptState(appAttemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
- attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus(),
- attemptStateData.getFinishTime(),
- attemptStateData.getMemorySeconds(),
- attemptStateData.getVcoreSeconds());
-
- ApplicationState appState =
- state.getApplicationState().get(
- updatedAttemptState.getAttemptId().getApplicationId());
+ ApplicationStateData appState =
+ state.getApplicationState().get(appAttemptId.getApplicationId());
if (appState == null) {
throw new YarnRuntimeException("Application doesn't exist");
}
- LOG.info("Updating final state " + updatedAttemptState.getState()
- + " for attempt: " + updatedAttemptState.getAttemptId());
- appState.attempts.put(updatedAttemptState.getAttemptId(),
- updatedAttemptState);
+ LOG.info("Updating final state " + attemptState.getState()
+ + " for attempt: " + attemptState.getAttemptId());
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@Override
public synchronized void removeApplicationStateInternal(
- ApplicationState appState) throws Exception {
- ApplicationId appId = appState.getAppId();
- ApplicationState removed = state.appState.remove(appId);
+ ApplicationStateData appState) throws Exception {
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
+ ApplicationStateData removed = state.appState.remove(appId);
+
if (removed == null) {
throw new YarnRuntimeException("Removing non-exsisting application state");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
index b957d12..f80c497 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
@@ -71,7 +71,7 @@ public class NullRMStateStore extends RMStateStore {
}
@Override
- protected void removeApplicationStateInternal(ApplicationState appState)
+ protected void removeApplicationStateInternal(ApplicationStateData appState)
throws Exception {
// Do nothing
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 8948b54..35a54c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -38,9 +38,6 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -56,12 +53,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
@@ -129,13 +124,13 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
- ApplicationId appId = appState.getAppId();
- ApplicationStateData appStateData = ApplicationStateData
- .newInstance(appState);
+ ApplicationStateData appState =
+ ((RMStateStoreAppEvent) event).getAppState();
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Storing info for app: " + appId);
try {
- store.storeApplicationStateInternal(appId, appStateData);
+ store.storeApplicationStateInternal(appId, appState);
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
@@ -154,13 +149,13 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
- ApplicationId appId = appState.getAppId();
- ApplicationStateData appStateData = ApplicationStateData
- .newInstance(appState);
+ ApplicationStateData appState =
+ ((RMStateUpdateAppEvent) event).getAppState();
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Updating info for app: " + appId);
try {
- store.updateApplicationStateInternal(appId, appStateData);
+ store.updateApplicationStateInternal(appId, appState);
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_UPDATE_SAVED));
} catch (Exception e) {
@@ -179,9 +174,10 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
- .getAppState();
- ApplicationId appId = appState.getAppId();
+ ApplicationStateData appState =
+ ((RMStateStoreRemoveAppEvent) event).getAppState();
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Removing info for app: " + appId);
try {
store.removeApplicationStateInternal(appState);
@@ -201,16 +197,14 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationAttemptState attemptState =
+ ApplicationAttemptStateData attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
try {
- ApplicationAttemptStateData attemptStateData =
- ApplicationAttemptStateData.newInstance(attemptState);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
- attemptStateData);
+ attemptState);
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
@@ -230,16 +224,14 @@ public abstract class RMStateStore extends AbstractService {
LOG.error("Illegal event type: " + event.getClass());
return;
}
- ApplicationAttemptState attemptState =
+ ApplicationAttemptStateData attemptState =
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
try {
- ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
- .newInstance(attemptState);
if (LOG.isDebugEnabled()) {
LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
}
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
- attemptStateData);
+ attemptState);
store.notifyApplicationAttempt(new RMAppAttemptEvent
(attemptState.getAttemptId(),
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
@@ -255,153 +247,6 @@ public abstract class RMStateStore extends AbstractService {
stateMachine = stateMachineFactory.make(this);
}
- /**
- * State of an application attempt
- */
- public static class ApplicationAttemptState {
- final ApplicationAttemptId attemptId;
- final Container masterContainer;
- final Credentials appAttemptCredentials;
- long startTime = 0;
- long finishTime = 0;
- // fields set when attempt completes
- RMAppAttemptState state;
- String finalTrackingUrl = "N/A";
- String diagnostics;
- int exitStatus = ContainerExitStatus.INVALID;
- FinalApplicationStatus amUnregisteredFinalStatus;
- long memorySeconds;
- long vcoreSeconds;
-
- public ApplicationAttemptState(ApplicationAttemptId attemptId,
- Container masterContainer, Credentials appAttemptCredentials,
- long startTime, long memorySeconds, long vcoreSeconds) {
- this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
- null, "", null, ContainerExitStatus.INVALID, 0, memorySeconds, vcoreSeconds);
- }
-
- public ApplicationAttemptState(ApplicationAttemptId attemptId,
- Container masterContainer, Credentials appAttemptCredentials,
- long startTime, RMAppAttemptState state, String finalTrackingUrl,
- String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
- int exitStatus, long finishTime, long memorySeconds,
- long vcoreSeconds) {
- this.attemptId = attemptId;
- this.masterContainer = masterContainer;
- this.appAttemptCredentials = appAttemptCredentials;
- this.startTime = startTime;
- this.state = state;
- this.finalTrackingUrl = finalTrackingUrl;
- this.diagnostics = diagnostics == null ? "" : diagnostics;
- this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
- this.exitStatus = exitStatus;
- this.finishTime = finishTime;
- this.memorySeconds = memorySeconds;
- this.vcoreSeconds = vcoreSeconds;
- }
-
- public Container getMasterContainer() {
- return masterContainer;
- }
- public ApplicationAttemptId getAttemptId() {
- return attemptId;
- }
- public Credentials getAppAttemptCredentials() {
- return appAttemptCredentials;
- }
- public RMAppAttemptState getState(){
- return state;
- }
- public String getFinalTrackingUrl() {
- return finalTrackingUrl;
- }
- public String getDiagnostics() {
- return diagnostics;
- }
- public long getStartTime() {
- return startTime;
- }
- public FinalApplicationStatus getFinalApplicationStatus() {
- return amUnregisteredFinalStatus;
- }
- public int getAMContainerExitStatus(){
- return this.exitStatus;
- }
- public long getMemorySeconds() {
- return memorySeconds;
- }
- public long getVcoreSeconds() {
- return vcoreSeconds;
- }
- public long getFinishTime() {
- return this.finishTime;
- }
- }
-
- /**
- * State of an application application
- */
- public static class ApplicationState {
- final ApplicationSubmissionContext context;
- final long submitTime;
- final long startTime;
- final String user;
- Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
- new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
- // fields set when application completes.
- RMAppState state;
- String diagnostics;
- long finishTime;
-
- public ApplicationState(long submitTime,
- long startTime, ApplicationSubmissionContext context, String user) {
- this(submitTime, startTime, context, user, null, "", 0);
- }
-
- public ApplicationState(long submitTime,
- long startTime,ApplicationSubmissionContext context,
- String user, RMAppState state, String diagnostics, long finishTime) {
- this.submitTime = submitTime;
- this.startTime = startTime;
- this.context = context;
- this.user = user;
- this.state = state;
- this.diagnostics = diagnostics == null ? "" : diagnostics;
- this.finishTime = finishTime;
- }
-
- public ApplicationId getAppId() {
- return context.getApplicationId();
- }
- public long getSubmitTime() {
- return submitTime;
- }
- public long getStartTime() {
- return startTime;
- }
- public int getAttemptCount() {
- return attempts.size();
- }
- public ApplicationSubmissionContext getApplicationSubmissionContext() {
- return context;
- }
- public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
- return attempts.get(attemptId);
- }
- public String getUser() {
- return user;
- }
- public RMAppState getState() {
- return state;
- }
- public String getDiagnostics() {
- return diagnostics;
- }
- public long getFinishTime() {
- return finishTime;
- }
- }
-
public static class RMDTSecretManagerState {
// DTIdentifier -> renewDate
Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
@@ -429,14 +274,14 @@ public abstract class RMStateStore extends AbstractService {
* State of the ResourceManager
*/
public static class RMState {
- Map<ApplicationId, ApplicationState> appState =
- new TreeMap<ApplicationId, ApplicationState>();
+ Map<ApplicationId, ApplicationStateData> appState =
+ new TreeMap<ApplicationId, ApplicationStateData>();
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
- public Map<ApplicationId, ApplicationState> getApplicationState() {
+ public Map<ApplicationId, ApplicationStateData> getApplicationState() {
return appState;
}
@@ -575,14 +420,15 @@ public abstract class RMStateStore extends AbstractService {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
- ApplicationState appState =
- new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
- app.getUser());
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(
+ app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
@SuppressWarnings("unchecked")
- public synchronized void updateApplicationState(ApplicationState appState) {
+ public synchronized void updateApplicationState(
+ ApplicationStateData appState) {
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
}
@@ -609,11 +455,13 @@ public abstract class RMStateStore extends AbstractService {
AggregateAppResourceUsage resUsage =
appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(appAttempt.getAppAttemptId(),
- appAttempt.getMasterContainer(), credentials,
- appAttempt.getStartTime(), resUsage.getMemorySeconds(),
- resUsage.getVcoreSeconds());
+ ApplicationAttemptStateData attemptState =
+ ApplicationAttemptStateData.newInstance(
+ appAttempt.getAppAttemptId(),
+ appAttempt.getMasterContainer(),
+ credentials, appAttempt.getStartTime(),
+ resUsage.getMemorySeconds(),
+ resUsage.getVcoreSeconds());
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
@@ -621,7 +469,7 @@ public abstract class RMStateStore extends AbstractService {
@SuppressWarnings("unchecked")
public synchronized void updateApplicationAttemptState(
- ApplicationAttemptState attemptState) {
+ ApplicationAttemptStateData attemptState) {
dispatcher.getEventHandler().handle(
new RMStateUpdateAppAttemptEvent(attemptState));
}
@@ -761,16 +609,12 @@ public abstract class RMStateStore extends AbstractService {
*/
@SuppressWarnings("unchecked")
public synchronized void removeApplication(RMApp app) {
- ApplicationState appState = new ApplicationState(
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(),
app.getApplicationSubmissionContext(), app.getUser());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
- Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(appAttempt.getAppAttemptId(),
- appAttempt.getMasterContainer(), credentials,
- appAttempt.getStartTime(), 0, 0);
- appState.attempts.put(attemptState.getAttemptId(), attemptState);
+ appState.attempts.put(appAttempt.getAppAttemptId(), null);
}
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
@@ -782,7 +626,7 @@ public abstract class RMStateStore extends AbstractService {
* application and its attempts
*/
protected abstract void removeApplicationStateInternal(
- ApplicationState appState) throws Exception;
+ ApplicationStateData appState) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-1779
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
index c4a04bc..3399431 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java
@@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class RMStateStoreAppAttemptEvent extends RMStateStoreEvent {
- ApplicationAttemptState attemptState;
+ ApplicationAttemptStateData attemptState;
- public RMStateStoreAppAttemptEvent(ApplicationAttemptState attemptState) {
+ public RMStateStoreAppAttemptEvent(ApplicationAttemptStateData attemptState) {
super(RMStateStoreEventType.STORE_APP_ATTEMPT);
this.attemptState = attemptState;
}
- public ApplicationAttemptState getAppAttemptState() {
+ public ApplicationAttemptStateData getAppAttemptState() {
return attemptState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
index 99f8e37..50e59f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
@@ -18,18 +18,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateStoreAppEvent extends RMStateStoreEvent {
- private final ApplicationState appState;
+ private final ApplicationStateData appState;
- public RMStateStoreAppEvent(ApplicationState appState) {
+ public RMStateStoreAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.STORE_APP);
this.appState = appState;
}
- public ApplicationState getAppState() {
+ public ApplicationStateData getAppState() {
return appState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
index 402feb9..fbba64c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java
@@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent {
- ApplicationState appState;
+ ApplicationStateData appState;
- RMStateStoreRemoveAppEvent(ApplicationState appState) {
+ RMStateStoreRemoveAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.REMOVE_APP);
this.appState = appState;
}
- public ApplicationState getAppState() {
+ public ApplicationStateData getAppState() {
return appState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java
index 9ded673..14f8e9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java
@@ -18,18 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
public class RMStateUpdateAppAttemptEvent extends RMStateStoreEvent {
- ApplicationAttemptState attemptState;
+ ApplicationAttemptStateData attemptState;
- public RMStateUpdateAppAttemptEvent(ApplicationAttemptState attemptState) {
+ public RMStateUpdateAppAttemptEvent(
+ ApplicationAttemptStateData attemptState) {
super(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
this.attemptState = attemptState;
}
- public ApplicationAttemptState getAppAttemptState() {
+ public ApplicationAttemptStateData getAppAttemptState() {
return attemptState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java
index 9bb96e5..cec364c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java
@@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
public class RMStateUpdateAppEvent extends RMStateStoreEvent {
- private final ApplicationState appState;
+ private final ApplicationStateData appState;
- public RMStateUpdateAppEvent(ApplicationState appState) {
+ public RMStateUpdateAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.UPDATE_APP);
this.appState = appState;
}
- public ApplicationState getAppState() {
+ public ApplicationStateData getAppState() {
return appState;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index ab048ca..a19ed30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -34,8 +34,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
@@ -562,17 +560,11 @@ public class ZKRMStateStore extends RMStateStore {
LOG.debug("Loading application from znode: " + childNodeName);
}
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
- ApplicationStateDataPBImpl appStateData =
+ ApplicationStateDataPBImpl appState =
new ApplicationStateDataPBImpl(
ApplicationStateDataProto.parseFrom(childData));
- ApplicationState appState =
- new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getStartTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser(),
- appStateData.getState(),
- appStateData.getDiagnostics(), appStateData.getFinishTime());
- if (!appId.equals(appState.context.getApplicationId())) {
+ if (!appId.equals(
+ appState.getApplicationSubmissionContext().getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application id");
}
@@ -584,7 +576,7 @@ public class ZKRMStateStore extends RMStateStore {
}
}
- private void loadApplicationAttemptState(ApplicationState appState,
+ private void loadApplicationAttemptState(ApplicationStateData appState,
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
@@ -594,31 +586,9 @@ public class ZKRMStateStore extends RMStateStore {
String attemptPath = getNodePath(appPath, attemptIDStr);
byte[] attemptData = getDataWithRetries(attemptPath, true);
- ApplicationAttemptId attemptId =
- ConverterUtils.toApplicationAttemptId(attemptIDStr);
- ApplicationAttemptStateDataPBImpl attemptStateData =
+ ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(attemptData));
- Credentials credentials = null;
- if (attemptStateData.getAppAttemptTokens() != null) {
- credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
- }
-
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(), attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
- attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus(),
- attemptStateData.getAMContainerExitStatus(),
- attemptStateData.getFinishTime(),
- attemptStateData.getMemorySeconds(),
- attemptStateData.getVcoreSeconds());
-
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@@ -705,9 +675,11 @@ public class ZKRMStateStore extends RMStateStore {
}
@Override
- public synchronized void removeApplicationStateInternal(ApplicationState appState)
+ public synchronized void removeApplicationStateInternal(
+ ApplicationStateData appState)
throws Exception {
- String appId = appState.getAppId().toString();
+ String appId = appState.getApplicationSubmissionContext().getApplicationId()
+ .toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
index 63ef8f6..391783b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
@@ -18,18 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
@@ -41,7 +37,7 @@ import org.apache.hadoop.yarn.util.Records;
public abstract class ApplicationAttemptStateData {
public static ApplicationAttemptStateData newInstance(
ApplicationAttemptId attemptId, Container container,
- ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+ Credentials attemptTokens, long startTime, RMAppAttemptState finalState,
String finalTrackingUrl, String diagnostics,
FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus,
long finishTime, long memorySeconds, long vcoreSeconds) {
@@ -52,7 +48,7 @@ public abstract class ApplicationAttemptStateData {
attemptStateData.setAppAttemptTokens(attemptTokens);
attemptStateData.setState(finalState);
attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
- attemptStateData.setDiagnostics(diagnostics);
+ attemptStateData.setDiagnostics(diagnostics == null ? "" : diagnostics);
attemptStateData.setStartTime(startTime);
attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
attemptStateData.setAMContainerExitStatus(exitStatus);
@@ -63,22 +59,14 @@ public abstract class ApplicationAttemptStateData {
}
public static ApplicationAttemptStateData newInstance(
- ApplicationAttemptState attemptState) throws IOException {
- Credentials credentials = attemptState.getAppAttemptCredentials();
- ByteBuffer appAttemptTokens = null;
- if (credentials != null) {
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ ApplicationAttemptId attemptId, Container masterContainer,
+ Credentials attemptTokens, long startTime, long memorySeconds,
+ long vcoreSeconds) {
+ return newInstance(attemptId, masterContainer, attemptTokens,
+ startTime, null, "N/A", "", null, ContainerExitStatus.INVALID, 0,
+ memorySeconds, vcoreSeconds);
}
- return newInstance(attemptState.getAttemptId(),
- attemptState.getMasterContainer(), appAttemptTokens,
- attemptState.getStartTime(), attemptState.getState(),
- attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
- attemptState.getFinalApplicationStatus(),
- attemptState.getAMContainerExitStatus(), attemptState.getFinishTime(),
- attemptState.getMemorySeconds(), attemptState.getVcoreSeconds());
- }
+
public abstract ApplicationAttemptStateDataProto getProto();
@@ -108,9 +96,9 @@ public abstract class ApplicationAttemptStateData {
*/
@Public
@Unstable
- public abstract ByteBuffer getAppAttemptTokens();
+ public abstract Credentials getAppAttemptTokens();
- public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
+ public abstract void setAppAttemptTokens(Credentials attemptTokens);
/**
* Get the final state of the application attempt.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
index eff0445..43046a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
@@ -18,14 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.Records;
@@ -36,6 +38,9 @@ import org.apache.hadoop.yarn.util.Records;
@Public
@Unstable
public abstract class ApplicationStateData {
+ public Map<ApplicationAttemptId, ApplicationAttemptStateData> attempts =
+ new HashMap<ApplicationAttemptId, ApplicationAttemptStateData>();
+
public static ApplicationStateData newInstance(long submitTime,
long startTime, String user,
ApplicationSubmissionContext submissionContext,
@@ -51,12 +56,18 @@ public abstract class ApplicationStateData {
return appState;
}
- public static ApplicationStateData newInstance(
- ApplicationState appState) {
- return newInstance(appState.getSubmitTime(), appState.getStartTime(),
- appState.getUser(), appState.getApplicationSubmissionContext(),
- appState.getState(), appState.getDiagnostics(),
- appState.getFinishTime());
+ public static ApplicationStateData newInstance(long submitTime,
+ long startTime, ApplicationSubmissionContext context, String user) {
+ return newInstance(submitTime, startTime, user, context, null, "", 0);
+ }
+
+ public int getAttemptCount() {
+ return attempts.size();
+ }
+
+ public ApplicationAttemptStateData getAttempt(
+ ApplicationAttemptId attemptId) {
+ return attempts.get(attemptId);
}
public abstract ApplicationStateDataProto getProto();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
index 516af2d..bae3f9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
@@ -18,8 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -37,6 +44,8 @@ import com.google.protobuf.TextFormat;
public class ApplicationAttemptStateDataPBImpl extends
ApplicationAttemptStateData {
+ private static Log LOG =
+ LogFactory.getLog(ApplicationAttemptStateDataPBImpl.class);
ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null;
@@ -137,26 +146,27 @@ public class ApplicationAttemptStateDataPBImpl extends
}
@Override
- public ByteBuffer getAppAttemptTokens() {
+ public Credentials getAppAttemptTokens() {
ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
if(appAttemptTokens != null) {
- return appAttemptTokens;
+ return convertCredentialsFromByteBuffer(appAttemptTokens);
}
if(!p.hasAppAttemptTokens()) {
return null;
}
this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
p.getAppAttemptTokens());
- return appAttemptTokens;
+ return convertCredentialsFromByteBuffer(appAttemptTokens);
}
@Override
- public void setAppAttemptTokens(ByteBuffer attemptTokens) {
+ public void setAppAttemptTokens(Credentials attemptTokens) {
maybeInitBuilder();
if(attemptTokens == null) {
builder.clearAppAttemptTokens();
+ return;
}
- this.appAttemptTokens = attemptTokens;
+ this.appAttemptTokens = convertCredentialsToByteBuffer(attemptTokens);
}
@Override
@@ -330,4 +340,44 @@ public class ApplicationAttemptStateDataPBImpl extends
maybeInitBuilder();
builder.setFinishTime(finishTime);
}
+
+ private static ByteBuffer convertCredentialsToByteBuffer(
+ Credentials credentials) {
+ ByteBuffer appAttemptTokens = null;
+ DataOutputBuffer dob = new DataOutputBuffer();
+ try {
+ if (credentials != null) {
+ credentials.writeTokenStorageToStream(dob);
+ appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
+ return appAttemptTokens;
+ } catch (IOException e) {
+ LOG.error("Failed to convert Credentials to ByteBuffer.");
+ assert false;
+ return null;
+ } finally {
+ IOUtils.closeStream(dob);
+ }
+ }
+
+ private static Credentials convertCredentialsFromByteBuffer(
+ ByteBuffer appAttemptTokens) {
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ try {
+ Credentials credentials = null;
+ if (appAttemptTokens != null) {
+ credentials = new Credentials();
+ appAttemptTokens.rewind();
+ dibb.reset(appAttemptTokens);
+ credentials.readTokenStorageStream(dibb);
+ }
+ return credentials;
+ } catch (IOException e) {
+ LOG.error("Failed to convert Credentials from ByteBuffer.");
+ assert false;
+ return null;
+ } finally {
+ IOUtils.closeStream(dibb);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 751dbe4..33b62fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -66,9 +66,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -728,10 +728,12 @@ public class RMAppImpl implements RMApp, Recoverable {
@Override
public void recover(RMState state) {
- ApplicationState appState = state.getApplicationState().get(getApplicationId());
+ ApplicationStateData appState =
+ state.getApplicationState().get(getApplicationId());
this.recoveredFinalState = appState.getState();
LOG.info("Recovering app: " + getApplicationId() + " with " +
- + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState );
+ + appState.getAttemptCount() + " attempts and final state = "
+ + this.recoveredFinalState );
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
@@ -1019,10 +1021,10 @@ public class RMAppImpl implements RMApp, Recoverable {
default:
break;
}
- ApplicationState appState =
- new ApplicationState(this.submitTime, this.startTime,
- this.submissionContext, this.user, stateToBeStored, diags,
- this.storedFinishTime);
+ ApplicationStateData appState =
+ ApplicationStateData.newInstance(this.submitTime, this.startTime,
+ this.user, this.submissionContext,
+ stateToBeStored, diags, this.storedFinishTime);
this.rmContext.getStateStore().updateApplicationState(appState);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index a80167f..4c52d29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
-import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -70,9 +69,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -793,9 +792,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override
public void recover(RMState state) {
- ApplicationState appState =
+ ApplicationStateData appState =
state.getApplicationState().get(getAppAttemptId().getApplicationId());
- ApplicationAttemptState attemptState =
+ ApplicationAttemptStateData attemptState =
appState.getAttempt(getAppAttemptId());
assert attemptState != null;
LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: "
@@ -806,9 +805,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
this.attemptMetrics.setIsPreempted();
}
+
+ Credentials credentials = attemptState.getAppAttemptTokens();
setMasterContainer(attemptState.getMasterContainer());
- recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
- attemptState.getState());
+ recoverAppAttemptCredentials(credentials, attemptState.getState());
this.recoveredFinalState = attemptState.getState();
this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -1123,10 +1123,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.attemptMetrics.getAggregateAppResourceUsage();
RMStateStore rmStore = rmContext.getStateStore();
setFinishTime(System.currentTimeMillis());
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
- rmStore.getCredentialsFromAppAttempt(this), startTime,
- stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus,
+
+ ApplicationAttemptStateData attemptState =
+ ApplicationAttemptStateData.newInstance(
+ applicationAttemptId, getMasterContainer(),
+ rmStore.getCredentialsFromAppAttempt(this),
+ startTime, stateToBeStored, finalTrackingUrl, diags,
+ finalStatus, exitStatus,
getFinishTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
LOG.info("Updating application attempt " + applicationAttemptId
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5805a81e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 122eb60..0200e85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -459,7 +460,8 @@ public class TestRMHA {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
- public synchronized void updateApplicationState(ApplicationState appState) {
+ public synchronized void updateApplicationState(
+ ApplicationStateData appState) {
notifyStoreOperationFailed(new StoreFencedException());
}
};