You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/11/10 21:09:16 UTC
svn commit: r1540535 [2/5] - in
/hadoop/common/branches/YARN-321/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/ha...
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Sun Nov 10 20:09:09 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -50,10 +51,14 @@ import org.apache.hadoop.yarn.security.c
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
@Private
@Unstable
@@ -86,13 +91,32 @@ public abstract class RMStateStore exten
final ApplicationAttemptId attemptId;
final Container masterContainer;
final Credentials appAttemptCredentials;
+ long startTime = 0;
+ // fields set when attempt completes
+ RMAppAttemptState state;
+ String finalTrackingUrl = "N/A";
+ String diagnostics;
+ FinalApplicationStatus amUnregisteredFinalStatus;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
- Container masterContainer,
- Credentials appAttemptCredentials) {
+ Container masterContainer, Credentials appAttemptCredentials,
+ long startTime) {
+ this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
+ null, "", null);
+ }
+
+ public ApplicationAttemptState(ApplicationAttemptId attemptId,
+ Container masterContainer, Credentials appAttemptCredentials,
+ long startTime, RMAppAttemptState state, String finalTrackingUrl,
+ String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
+ this.startTime = startTime;
+ this.state = state;
+ this.finalTrackingUrl = finalTrackingUrl;
+ this.diagnostics = diagnostics == null ? "" : diagnostics;
+ this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
}
public Container getMasterContainer() {
@@ -104,6 +128,21 @@ public abstract class RMStateStore exten
public Credentials getAppAttemptCredentials() {
return appAttemptCredentials;
}
+ public RMAppAttemptState getState(){
+ return state;
+ }
+ public String getFinalTrackingUrl() {
+ return finalTrackingUrl;
+ }
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+ public long getStartTime() {
+ return startTime;
+ }
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ return amUnregisteredFinalStatus;
+ }
}
/**
@@ -112,15 +151,30 @@ public abstract class RMStateStore exten
public static class ApplicationState {
final ApplicationSubmissionContext context;
final long submitTime;
+ final long startTime;
final String user;
Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
-
- ApplicationState(long submitTime, ApplicationSubmissionContext context,
- String user) {
+ // fields set when application completes.
+ RMAppState state;
+ String diagnostics;
+ long finishTime;
+
+ public ApplicationState(long submitTime,
+ long startTime, ApplicationSubmissionContext context, String user) {
+ this(submitTime, startTime, context, user, null, "", 0);
+ }
+
+ public ApplicationState(long submitTime,
+ long startTime,ApplicationSubmissionContext context,
+ String user, RMAppState state, String diagnostics, long finishTime) {
this.submitTime = submitTime;
+ this.startTime = startTime;
this.context = context;
this.user = user;
+ this.state = state;
+ this.diagnostics = diagnostics == null ? "" : diagnostics;
+ this.finishTime = finishTime;
}
public ApplicationId getAppId() {
@@ -129,6 +183,9 @@ public abstract class RMStateStore exten
public long getSubmitTime() {
return submitTime;
}
+ public long getStartTime() {
+ return startTime;
+ }
public int getAttemptCount() {
return attempts.size();
}
@@ -141,6 +198,15 @@ public abstract class RMStateStore exten
public String getUser() {
return user;
}
+ public RMAppState getState() {
+ return state;
+ }
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+ public long getFinishTime() {
+ return finishTime;
+ }
}
public static class RMDTSecretManagerState {
@@ -195,17 +261,20 @@ public abstract class RMStateStore exten
}
AsyncDispatcher dispatcher;
-
- public synchronized void serviceInit(Configuration conf) throws Exception{
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception{
// create async handler
dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.register(RMStateStoreEventType.class,
new ForwardingEventHandler());
+ dispatcher.setDrainEventsOnStop();
initInternal(conf);
}
-
- protected synchronized void serviceStart() throws Exception {
+
+ @Override
+ protected void serviceStart() throws Exception {
dispatcher.start();
startInternal();
}
@@ -222,11 +291,12 @@ public abstract class RMStateStore exten
*/
protected abstract void startInternal() throws Exception;
- public synchronized void serviceStop() throws Exception {
+ @Override
+ protected void serviceStop() throws Exception {
closeInternal();
dispatcher.stop();
}
-
+
/**
* Derived classes close themselves using this method.
* The base class will be closed and the event dispatcher will be shutdown
@@ -249,23 +319,31 @@ public abstract class RMStateStore exten
* RMAppStoredEvent will be sent on completion to notify the RMApp
*/
@SuppressWarnings("unchecked")
- public synchronized void storeApplication(RMApp app) {
+ public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
- ApplicationState appState = new ApplicationState(
- app.getSubmitTime(), context, app.getUser());
+ ApplicationState appState =
+ new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
+ app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
-
+
+ @SuppressWarnings("unchecked")
+ public synchronized void updateApplicationState(ApplicationState appState) {
+ dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
+ }
+
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application.
*/
- protected abstract void storeApplicationState(String appId,
- ApplicationStateDataPBImpl appStateData)
- throws Exception;
+ protected abstract void storeApplicationStateInternal(String appId,
+ ApplicationStateDataPBImpl appStateData) throws Exception;
+
+ protected abstract void updateApplicationStateInternal(String appId,
+ ApplicationStateDataPBImpl appStateData) throws Exception;
@SuppressWarnings("unchecked")
/**
@@ -274,26 +352,35 @@ public abstract class RMStateStore exten
* This does not block the dispatcher threads
* RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
*/
- public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
+ public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
- appAttempt.getMasterContainer(), credentials);
+ appAttempt.getMasterContainer(), credentials,
+ appAttempt.getStartTime());
dispatcher.getEventHandler().handle(
new RMStateStoreAppAttemptEvent(attemptState));
}
-
+
+ @SuppressWarnings("unchecked")
+ public synchronized void updateApplicationAttemptState(
+ ApplicationAttemptState attemptState) {
+ dispatcher.getEventHandler().handle(
+ new RMStateUpdateAppAttemptEvent(attemptState));
+ }
+
/**
* Blocking API
* Derived classes must implement this method to store the state of an
* application attempt
*/
- protected abstract void storeApplicationAttemptState(String attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData)
- throws Exception;
+ protected abstract void storeApplicationAttemptStateInternal(String attemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+ protected abstract void updateApplicationAttemptStateInternal(String attemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
/**
* RMDTSecretManager call this to store the state of a delegation token
@@ -372,13 +459,14 @@ public abstract class RMStateStore exten
*/
public synchronized void removeApplication(RMApp app) {
ApplicationState appState = new ApplicationState(
- app.getSubmitTime(), app.getApplicationSubmissionContext(),
- app.getUser());
+ app.getSubmitTime(), app.getStartTime(),
+ app.getApplicationSubmissionContext(), app.getUser());
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
Credentials credentials = getCredentialsFromAppAttempt(appAttempt);
ApplicationAttemptState attemptState =
new ApplicationAttemptState(appAttempt.getAppAttemptId(),
- appAttempt.getMasterContainer(), credentials);
+ appAttempt.getMasterContainer(), credentials,
+ appAttempt.getStartTime());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
@@ -409,7 +497,7 @@ public abstract class RMStateStore exten
public static final Text AM_CLIENT_TOKEN_MASTER_KEY_NAME =
new Text("YARN_CLIENT_TOKEN_MASTER_KEY");
- private Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
+ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
if(appToken != null){
@@ -425,94 +513,124 @@ public abstract class RMStateStore exten
}
// Dispatcher related code
-
- private synchronized void handleStoreEvent(RMStateStoreEvent event) {
- switch(event.getType()) {
- case STORE_APP:
- {
- ApplicationState apptState =
- ((RMStateStoreAppEvent) event).getAppState();
- Exception storedException = null;
- ApplicationStateDataPBImpl appStateData =
- new ApplicationStateDataPBImpl();
- appStateData.setSubmitTime(apptState.getSubmitTime());
- appStateData.setApplicationSubmissionContext(
- apptState.getApplicationSubmissionContext());
- appStateData.setUser(apptState.getUser());
- ApplicationId appId =
- apptState.getApplicationSubmissionContext().getApplicationId();
-
- LOG.info("Storing info for app: " + appId);
- try {
- storeApplicationState(appId.toString(), appStateData);
- } catch (Exception e) {
- LOG.error("Error storing app: " + appId, e);
- storedException = e;
- } finally {
- notifyDoneStoringApplication(appId, storedException);
- }
+ protected void handleStoreEvent(RMStateStoreEvent event) {
+ if (event.getType().equals(RMStateStoreEventType.STORE_APP)
+ || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
+ ApplicationState appState = null;
+ if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
+ appState = ((RMStateStoreAppEvent) event).getAppState();
+ } else {
+ assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
+ appState = ((RMStateUpdateAppEvent) event).getAppState();
+ }
+
+ Exception storedException = null;
+ ApplicationStateDataPBImpl appStateData =
+ (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
+ .newApplicationStateData(appState.getSubmitTime(),
+ appState.getStartTime(), appState.getUser(),
+ appState.getApplicationSubmissionContext(), appState.getState(),
+ appState.getDiagnostics(), appState.getFinishTime());
+
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
+
+ LOG.info("Storing info for app: " + appId);
+ try {
+ if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
+ storeApplicationStateInternal(appId.toString(), appStateData);
+ } else {
+ assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
+ updateApplicationStateInternal(appId.toString(), appStateData);
+ }
+ } catch (Exception e) {
+ LOG.error("Error storing app: " + appId, e);
+ storedException = e;
+ } finally {
+ if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
+ notifyDoneStoringApplication(appId, storedException);
+ } else {
+ notifyDoneUpdatingApplication(appId, storedException);
}
- break;
- case STORE_APP_ATTEMPT:
- {
- ApplicationAttemptState attemptState =
- ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
- Exception storedException = null;
-
- Credentials credentials = attemptState.getAppAttemptCredentials();
- ByteBuffer appAttemptTokens = null;
- try {
- if(credentials != null){
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- appAttemptTokens =
- ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- }
- ApplicationAttemptStateDataPBImpl attemptStateData =
- (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
- .newApplicationAttemptStateData(attemptState.getAttemptId(),
- attemptState.getMasterContainer(), appAttemptTokens);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
- }
- storeApplicationAttemptState(attemptState.getAttemptId().toString(),
- attemptStateData);
- } catch (Exception e) {
- LOG.error("Error storing appAttempt: "
- + attemptState.getAttemptId(), e);
- storedException = e;
- } finally {
- notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
- storedException);
- }
+ }
+ } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
+ || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
+
+ ApplicationAttemptState attemptState = null;
+ if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
+ attemptState =
+ ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+ } else {
+ assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
+ attemptState =
+ ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
+ }
+
+ Exception storedException = null;
+ Credentials credentials = attemptState.getAppAttemptCredentials();
+ ByteBuffer appAttemptTokens = null;
+ try {
+ if (credentials != null) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
- break;
- case REMOVE_APP:
- {
- ApplicationState appState =
- ((RMStateStoreRemoveAppEvent) event).getAppState();
- ApplicationId appId = appState.getAppId();
- Exception removedException = null;
- LOG.info("Removing info for app: " + appId);
- try {
- removeApplicationState(appState);
- } catch (Exception e) {
- LOG.error("Error removing app: " + appId, e);
- removedException = e;
- } finally {
- notifyDoneRemovingApplcation(appId, removedException);
- }
+ ApplicationAttemptStateDataPBImpl attemptStateData =
+ (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
+ .newApplicationAttemptStateData(attemptState.getAttemptId(),
+ attemptState.getMasterContainer(), appAttemptTokens,
+ attemptState.getStartTime(), attemptState.getState(),
+ attemptState.getFinalTrackingUrl(),
+ attemptState.getDiagnostics(),
+ attemptState.getFinalApplicationStatus());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
- break;
- default:
- LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+ if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
+ storeApplicationAttemptStateInternal(attemptState.getAttemptId()
+ .toString(), attemptStateData);
+ } else {
+ assert event.getType().equals(
+ RMStateStoreEventType.UPDATE_APP_ATTEMPT);
+ updateApplicationAttemptStateInternal(attemptState.getAttemptId()
+ .toString(), attemptStateData);
+ }
+ } catch (Exception e) {
+ LOG
+ .error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
+ storedException = e;
+ } finally {
+ if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
+ notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
+ storedException);
+ } else {
+ notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
+ storedException);
+ }
+ }
+ } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
+ ApplicationState appState =
+ ((RMStateStoreRemoveAppEvent) event).getAppState();
+ ApplicationId appId = appState.getAppId();
+ Exception removedException = null;
+ LOG.info("Removing info for app: " + appId);
+ try {
+ removeApplicationState(appState);
+ } catch (Exception e) {
+ LOG.error("Error removing app: " + appId, e);
+ removedException = e;
+ } finally {
+ notifyDoneRemovingApplcation(appId, removedException);
+ }
+ } else {
+ LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
}
}
@SuppressWarnings("unchecked")
/**
* In (@link handleStoreEvent}, this method is called to notify the
- * application about operation completion
+ * application that new application is stored in state store
* @param appId id of the application that has been saved
* @param storedException the exception that is thrown when storing the
* application
@@ -520,19 +638,33 @@ public abstract class RMStateStore exten
private void notifyDoneStoringApplication(ApplicationId appId,
Exception storedException) {
rmDispatcher.getEventHandler().handle(
- new RMAppStoredEvent(appId, storedException));
+ new RMAppNewSavedEvent(appId, storedException));
}
-
+
+ @SuppressWarnings("unchecked")
+ private void notifyDoneUpdatingApplication(ApplicationId appId,
+ Exception storedException) {
+ rmDispatcher.getEventHandler().handle(
+ new RMAppUpdateSavedEvent(appId, storedException));
+ }
+
@SuppressWarnings("unchecked")
/**
* In (@link handleStoreEvent}, this method is called to notify the
- * application attempt about operation completion
+ * application attempt that new attempt is stored in state store
* @param appAttempt attempt that has been saved
*/
private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
Exception storedException) {
rmDispatcher.getEventHandler().handle(
- new RMAppAttemptStoredEvent(attemptId, storedException));
+ new RMAppAttemptNewSavedEvent(attemptId, storedException));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
+ Exception updatedException) {
+ rmDispatcher.getEventHandler().handle(
+ new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
}
@SuppressWarnings("unchecked")
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java Sun Nov 10 20:09:09 2013
@@ -21,5 +21,7 @@ package org.apache.hadoop.yarn.server.re
public enum RMStateStoreEventType {
STORE_APP_ATTEMPT,
STORE_APP,
+ UPDATE_APP,
+ UPDATE_APP_ATTEMPT,
REMOVE_APP
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Sun Nov 10 20:09:09 2013
@@ -18,7 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -27,6 +34,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -37,9 +46,6 @@ import org.apache.hadoop.yarn.security.c
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ZKUtil;
-
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -51,13 +57,7 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
@Private
@Unstable
@@ -224,8 +224,11 @@ public class ZKRMStateStore extends RMSt
ApplicationStateDataProto.parseFrom(childData));
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser());
+ appStateData.getStartTime(),
+ appStateData.getApplicationSubmissionContext(),
+ appStateData.getUser(),
+ appStateData.getState(),
+ appStateData.getDiagnostics(), appStateData.getFinishTime());
if (!appId.equals(appState.context.getApplicationId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application id");
@@ -249,7 +252,12 @@ public class ZKRMStateStore extends RMSt
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials);
+ attemptStateData.getMasterContainer(), credentials,
+ attemptStateData.getStartTime(),
+ attemptStateData.getState(),
+ attemptStateData.getFinalTrackingUrl(),
+ attemptStateData.getDiagnostics(),
+ attemptStateData.getFinalApplicationStatus());
if (!attemptId.equals(attemptState.getAttemptId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application attempt id");
@@ -280,21 +288,34 @@ public class ZKRMStateStore extends RMSt
}
@Override
- public synchronized void storeApplicationState(
- String appId, ApplicationStateDataPBImpl appStateDataPB) throws
- Exception {
+ public synchronized void storeApplicationStateInternal(String appId,
+ ApplicationStateDataPBImpl appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- createWithRetries(
- nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
+ createWithRetries(nodeCreatePath, appStateData, zkAcl,
+ CreateMode.PERSISTENT);
+
+ }
+
+ @Override
+ public synchronized void updateApplicationStateInternal(String appId,
+ ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ String nodeCreatePath = getNodePath(rmAppRoot, appId);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing final state info for app: " + appId + " at: "
+ + nodeCreatePath);
+ }
+ byte[] appStateData = appStateDataPB.getProto().toByteArray();
+ setDataWithRetries(nodeCreatePath, appStateData, 0);
}
@Override
- public synchronized void storeApplicationAttemptState(
+ public synchronized void storeApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
@@ -304,7 +325,20 @@ public class ZKRMStateStore extends RMSt
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
+ }
+
+ @Override
+ public synchronized void updateApplicationAttemptStateInternal(
+ String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ throws Exception {
+ String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing final state info for attempt: " + attemptId + " at: "
+ + nodeCreatePath);
+ }
+ byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
+ setDataWithRetries(nodeCreatePath, attemptStateData, 0);
}
@Override
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Sun Nov 10 20:09:09 2013
@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
/*
* Contains the state data that needs to be persisted for an ApplicationAttempt
@@ -61,4 +63,50 @@ public interface ApplicationAttemptState
public ByteBuffer getAppAttemptTokens();
public void setAppAttemptTokens(ByteBuffer attemptTokens);
+
+ /**
+ * Get the final state of the application attempt.
+ * @return the final state of the application attempt.
+ */
+ public RMAppAttemptState getState();
+
+ public void setState(RMAppAttemptState state);
+
+ /**
+ * Get the original not-proxied <em>final tracking url</em> for the
+ * application. This is intended to only be used by the proxy itself.
+ *
+ * @return the original not-proxied <em>final tracking url</em> for the
+ * application
+ */
+ public String getFinalTrackingUrl();
+
+ /**
+ * Set the final tracking Url of the AM.
+ * @param url
+ */
+ public void setFinalTrackingUrl(String url);
+ /**
+ * Get the <em>diagnositic information</em> of the attempt
+ * @return <em>diagnositic information</em> of the attempt
+ */
+ public String getDiagnostics();
+
+ public void setDiagnostics(String diagnostics);
+
+ /**
+ * Get the <em>start time</em> of the application.
+ * @return <em>start time</em> of the application
+ */
+ public long getStartTime();
+
+ public void setStartTime(long startTime);
+
+ /**
+ * Get the <em>final finish status</em> of the application.
+ * @return <em>final finish status</em> of the application
+ */
+ public FinalApplicationStatus getFinalApplicationStatus();
+
+ public void setFinalApplicationStatus(FinalApplicationStatus finishState);
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Sun Nov 10 20:09:09 2013
@@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
/**
* Contains all the state data that needs to be stored persistently
@@ -42,7 +45,19 @@ public interface ApplicationStateData {
@Public
@Unstable
public void setSubmitTime(long submitTime);
-
+
+ /**
+ * Get the <em>start time</em> of the application.
+ * @return <em>start time</em> of the application
+ */
+ @Public
+ @Stable
+ public abstract long getStartTime();
+
+ @Private
+ @Unstable
+ public abstract void setStartTime(long startTime);
+
/**
* The application submitter
*/
@@ -66,6 +81,29 @@ public interface ApplicationStateData {
@Public
@Unstable
public void setApplicationSubmissionContext(
- ApplicationSubmissionContext context);
+ ApplicationSubmissionContext context);
+
+ /**
+ * Get the final state of the application.
+ * @return the final state of the application.
+ */
+ public RMAppState getState();
+
+ public void setState(RMAppState state);
+
+ /**
+ * Get the diagnostics information for the application master.
+ * @return the diagnostics information for the application master.
+ */
+ public String getDiagnostics();
+
+ public void setDiagnostics(String diagnostics);
+
+ /**
+ * The finish time of the application.
+ * @return the finish time of the application.,
+ */
+ public long getFinishTime();
+ public void setFinishTime(long finishTime);
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Sun Nov 10 20:09:09 2013
@@ -22,14 +22,19 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppAttemptStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
public class ApplicationAttemptStateDataPBImpl
extends ProtoBase<ApplicationAttemptStateDataProto>
@@ -156,14 +161,125 @@ implements ApplicationAttemptStateData {
this.appAttemptTokens = attemptTokens;
}
+ @Override
+ public RMAppAttemptState getState() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasAppAttemptState()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getAppAttemptState());
+ }
+
+ @Override
+ public void setState(RMAppAttemptState state) {
+ maybeInitBuilder();
+ if (state == null) {
+ builder.clearAppAttemptState();
+ return;
+ }
+ builder.setAppAttemptState(convertToProtoFormat(state));
+ }
+
+ @Override
+ public String getFinalTrackingUrl() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasFinalTrackingUrl()) {
+ return null;
+ }
+ return p.getFinalTrackingUrl();
+ }
+
+ @Override
+ public void setFinalTrackingUrl(String url) {
+ maybeInitBuilder();
+ if (url == null) {
+ builder.clearFinalTrackingUrl();
+ return;
+ }
+ builder.setFinalTrackingUrl(url);
+ }
+
+ @Override
+ public String getDiagnostics() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasDiagnostics()) {
+ return null;
+ }
+ return p.getDiagnostics();
+ }
+
+ @Override
+ public void setDiagnostics(String diagnostics) {
+ maybeInitBuilder();
+ if (diagnostics == null) {
+ builder.clearDiagnostics();
+ return;
+ }
+ builder.setDiagnostics(diagnostics);
+ }
+
+ @Override
+ public long getStartTime() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getStartTime();
+ }
+
+ @Override
+ public void setStartTime(long startTime) {
+ maybeInitBuilder();
+ builder.setStartTime(startTime);
+ }
+
+ @Override
+ public FinalApplicationStatus getFinalApplicationStatus() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasFinalApplicationStatus()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getFinalApplicationStatus());
+ }
+
+ @Override
+ public void setFinalApplicationStatus(FinalApplicationStatus finishState) {
+ maybeInitBuilder();
+ if (finishState == null) {
+ builder.clearFinalApplicationStatus();
+ return;
+ }
+ builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
+ }
+
public static ApplicationAttemptStateData newApplicationAttemptStateData(
ApplicationAttemptId attemptId, Container container,
- ByteBuffer attemptTokens) {
+ ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+ String finalTrackingUrl, String diagnostics,
+ FinalApplicationStatus amUnregisteredFinalStatus) {
ApplicationAttemptStateData attemptStateData =
recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
attemptStateData.setAttemptId(attemptId);
attemptStateData.setMasterContainer(container);
attemptStateData.setAppAttemptTokens(attemptTokens);
+ attemptStateData.setState(finalState);
+ attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
+ attemptStateData.setDiagnostics(diagnostics);
+ attemptStateData.setStartTime(startTime);
+ attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
return attemptStateData;
}
+
+ private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
+ public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
+ return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());
+ }
+ public static RMAppAttemptState convertFromProtoFormat(RMAppAttemptStateProto e) {
+ return RMAppAttemptState.valueOf(e.name().replace(RM_APP_ATTEMPT_PREFIX, ""));
+ }
+
+ private FinalApplicationStatusProto convertToProtoFormat(FinalApplicationStatus s) {
+ return ProtoUtils.convertToProtoFormat(s);
+ }
+ private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
+ return ProtoUtils.convertFromProtoFormat(s);
+ }
+
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Sun Nov 10 20:09:09 2013
@@ -21,14 +21,20 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
public class ApplicationStateDataPBImpl
extends ProtoBase<ApplicationStateDataProto>
implements ApplicationStateData {
-
+ private static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
ApplicationStateDataProto proto =
ApplicationStateDataProto.getDefaultInstance();
ApplicationStateDataProto.Builder builder = null;
@@ -92,6 +98,18 @@ implements ApplicationStateData {
}
@Override
+ public long getStartTime() {
+ ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getStartTime();
+ }
+
+ @Override
+ public void setStartTime(long startTime) {
+ maybeInitBuilder();
+ builder.setStartTime(startTime);
+ }
+
+ @Override
public String getUser() {
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasUser()) {
@@ -132,4 +150,78 @@ implements ApplicationStateData {
this.applicationSubmissionContext = context;
}
+ @Override
+ public RMAppState getState() {
+ ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasApplicationState()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getApplicationState());
+ }
+
+ @Override
+ public void setState(RMAppState finalState) {
+ maybeInitBuilder();
+ if (finalState == null) {
+ builder.clearApplicationState();
+ return;
+ }
+ builder.setApplicationState(convertToProtoFormat(finalState));
+ }
+
+ @Override
+ public String getDiagnostics() {
+ ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasDiagnostics()) {
+ return null;
+ }
+ return p.getDiagnostics();
+ }
+
+ @Override
+ public void setDiagnostics(String diagnostics) {
+ maybeInitBuilder();
+ if (diagnostics == null) {
+ builder.clearDiagnostics();
+ return;
+ }
+ builder.setDiagnostics(diagnostics);
+ }
+
+ @Override
+ public long getFinishTime() {
+ ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getFinishTime();
+ }
+
+ @Override
+ public void setFinishTime(long finishTime) {
+ maybeInitBuilder();
+ builder.setFinishTime(finishTime);
+ }
+
+ public static ApplicationStateData newApplicationStateData(long submitTime,
+ long startTime, String user,
+ ApplicationSubmissionContext submissionContext, RMAppState state,
+ String diagnostics, long finishTime) {
+
+ ApplicationStateData appState =
+ recordFactory.newRecordInstance(ApplicationStateData.class);
+ appState.setSubmitTime(submitTime);
+ appState.setStartTime(startTime);
+ appState.setUser(user);
+ appState.setApplicationSubmissionContext(submissionContext);
+ appState.setState(state);
+ appState.setDiagnostics(diagnostics);
+ appState.setFinishTime(finishTime);
+ return appState;
+ }
+
+ private static String RM_APP_PREFIX = "RMAPP_";
+ public static RMAppStateProto convertToProtoFormat(RMAppState e) {
+ return RMAppStateProto.valueOf(RM_APP_PREFIX + e.name());
+ }
+ public static RMAppState convertFromProtoFormat(RMAppStateProto e) {
+ return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, ""));
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Sun Nov 10 20:09:09 2013
@@ -35,6 +35,7 @@ public enum RMAppEventType {
NODE_UPDATE,
// Source: RMStateStore
- APP_SAVED,
+ APP_NEW_SAVED,
+ APP_UPDATE_SAVED,
APP_REMOVED
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Sun Nov 10 20:09:09 2013
@@ -54,10 +54,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -103,7 +101,8 @@ public class RMAppImpl implements RMApp,
// Mutable fields
private long startTime;
- private long finishTime;
+ private long finishTime = 0;
+ private long storedFinishTime = 0;
private RMAppAttempt currentAttempt;
private String queue;
@SuppressWarnings("rawtypes")
@@ -111,8 +110,11 @@ public class RMAppImpl implements RMApp,
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition();
- private boolean isAppRemovalRequestSent = false;
- private RMAppState previousStateAtRemoving;
+ private RMAppState stateBeforeFinalSaving;
+ private RMAppEvent eventCausingFinalSaving;
+ private RMAppState targetedFinalState;
+ private RMAppState recoveredFinalState;
+ Object transitionTodo;
private static final StateMachineFactory<RMAppImpl,
RMAppState,
@@ -129,32 +131,45 @@ public class RMAppImpl implements RMApp,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppSavingTransition())
- .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
- RMAppEventType.RECOVER, new StartAppAttemptTransition())
- .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
- new AppKilledTransition())
- .addTransition(RMAppState.NEW, RMAppState.FAILED,
- RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+ .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
+ RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
+ RMAppState.FINAL_SAVING),
+ RMAppEventType.RECOVER, new RMAppRecoveredTransition())
+ .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
+ new FinalSavingTransition(
+ new AppKilledTransition(), RMAppState.KILLED))
+ .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_REJECTED,
+ new FinalSavingTransition(
+ new AppRejectedTransition(), RMAppState.FAILED))
// Transitions from NEW_SAVING state
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
- RMAppEventType.APP_SAVED, new StartAppAttemptTransition())
- .addTransition(RMAppState.NEW_SAVING, RMAppState.KILLED,
- RMAppEventType.KILL, new AppKilledTransition())
- .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED,
- RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+ RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition())
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.KILL,
+ new FinalSavingTransition(
+ new AppKilledTransition(), RMAppState.KILLED))
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_REJECTED,
+ new FinalSavingTransition(new AppRejectedTransition(),
+ RMAppState.FAILED))
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
- RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_REJECTED,
+ new FinalSavingTransition(
+ new AppRejectedTransition(), RMAppState.FAILED))
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED)
- .addTransition(RMAppState.SUBMITTED, RMAppState.KILLED,
- RMAppEventType.KILL, new KillAppAndAttemptTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.KILL,
+ new FinalSavingTransition(
+ new KillAppAndAttemptTransition(), RMAppState.KILLED))
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -162,37 +177,45 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
- EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
+ EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
- .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
- RMAppEventType.KILL, new KillAppAndAttemptTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.KILL,
+ new FinalSavingTransition(
+ new KillAppAndAttemptTransition(), RMAppState.KILLED))
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
- RMAppEventType.ATTEMPT_UNREGISTERED,
- new RMAppRemovingTransition())
+ .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_UNREGISTERED,
+ new FinalSavingTransition(
+ new AttemptUnregisteredTransition(),
+ RMAppState.FINISHING, RMAppState.FINISHED))
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
+ // UnManagedAM directly jumps to finished
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
.addTransition(RMAppState.RUNNING,
- EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
+ EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
- .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
- RMAppEventType.KILL, new KillAppAndAttemptTransition())
-
- // Transitions from REMOVING state
- .addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
- RMAppEventType.APP_REMOVED, new RMAppFinishingTransition())
- .addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
- RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
- .addTransition(RMAppState.REMOVING, RMAppState.KILLED,
- RMAppEventType.KILL, new KillAppAndAttemptTransition())
+ .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
+ RMAppEventType.KILL,
+ new FinalSavingTransition(
+ new KillAppAndAttemptTransition(), RMAppState.KILLED))
+
+ // Transitions from FINAL_SAVING state
+ .addTransition(RMAppState.FINAL_SAVING,
+ EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED,
+ RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED,
+ new FinalStateSavedTransition())
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_FINISHED,
+ new AttemptFinishedAtFinalSavingTransition())
// ignorable transitions
- .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
- RMAppEventType.NODE_UPDATE)
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL))
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@@ -201,7 +224,7 @@ public class RMAppImpl implements RMApp,
RMAppEventType.KILL, new KillAppAndAttemptTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
+ EnumSet.of(RMAppEventType.NODE_UPDATE))
// Transitions from FINISHED state
// ignorable transitions
@@ -210,14 +233,12 @@ public class RMAppImpl implements RMApp,
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
- RMAppEventType.KILL,
- RMAppEventType.APP_REMOVED))
+ RMAppEventType.KILL))
// Transitions from FAILED state
// ignorable transitions
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
- EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
- RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
+ EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
// Transitions from KILLED state
// ignorable transitions
@@ -227,8 +248,7 @@ public class RMAppImpl implements RMApp,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
- RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE,
- RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
+ RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
.installTopology();
@@ -316,9 +336,8 @@ public class RMAppImpl implements RMApp,
@Override
public RMAppState getState() {
this.readLock.lock();
-
try {
- return this.stateMachine.getCurrentState();
+ return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
@@ -398,7 +417,7 @@ public class RMAppImpl implements RMApp,
case SUBMITTED:
case ACCEPTED:
case RUNNING:
- case REMOVING:
+ case FINAL_SAVING:
return FinalApplicationStatus.UNDEFINED;
// finished without a proper final state is the same as failed
case FINISHING:
@@ -586,8 +605,12 @@ public class RMAppImpl implements RMApp,
@Override
public void recover(RMState state) throws Exception{
ApplicationState appState = state.getApplicationState().get(getApplicationId());
+ this.recoveredFinalState = appState.getState();
LOG.info("Recovering app: " + getApplicationId() + " with " +
- + appState.getAttemptCount() + " attempts");
+ + appState.getAttemptCount() + " attempts and final state = " + this.recoveredFinalState );
+ this.diagnostics.append(appState.getDiagnostics());
+ this.storedFinishTime = appState.getFinishTime();
+ this.startTime = appState.getStartTime();
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
createNewAttempt(false);
@@ -632,60 +655,195 @@ public class RMAppImpl implements RMApp,
nodeUpdateEvent.getNode());
};
}
-
+
+ private static final class RMAppRecoveredTransition implements
+ MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+ @Override
+ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+
+ if (app.recoveredFinalState != null) {
+ FINAL_TRANSITION.transition(app, event);
+ return app.recoveredFinalState;
+ }
+ // Directly call AttemptFailedTransition, since now we deem that an
+ // application fails because of RM restart as a normal AM failure.
+
+ // Do not recover unmanaged applications since current recovery
+ // mechanism of restarting attempts does not work for them.
+ // This will need to be changed in work preserving recovery in which
+ // RM will re-connect with the running AM's instead of restarting them
+
+ // In work-preserve restart, if attemptCount == maxAttempts, the job still
+ // needs to be recovered because the last attempt may still be running.
+
+ // As part of YARN-1210, we may return ACCECPTED state waiting for AM to
+ // reregister or fail and remove the following code.
+ return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
+ event);
+ }
+ }
+
private static final class StartAppAttemptTransition extends RMAppTransition {
+ @Override
public void transition(RMAppImpl app, RMAppEvent event) {
- if (event.getType().equals(RMAppEventType.APP_SAVED)) {
- assert app.getState().equals(RMAppState.NEW_SAVING);
- RMAppStoredEvent storeEvent = (RMAppStoredEvent) event;
- if(storeEvent.getStoredException() != null) {
- // For HA this exception needs to be handled by giving up
- // master status if we got fenced
- LOG.error("Failed to store application: "
- + storeEvent.getApplicationId(),
- storeEvent.getStoredException());
- ExitUtil.terminate(1, storeEvent.getStoredException());
- }
+ RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
+ if (storeEvent.getStoredException() != null) {
+ // For HA this exception needs to be handled by giving up
+ // master status if we got fenced
+ LOG.error(
+ "Failed to store application: " + storeEvent.getApplicationId(),
+ storeEvent.getStoredException());
+ ExitUtil.terminate(1, storeEvent.getStoredException());
}
-
app.createNewAttempt(true);
};
}
- private static final class RMAppFinishingTransition extends RMAppTransition {
+ private static final class FinalStateSavedTransition implements
+ MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+ RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
+ if (storeEvent.getUpdatedException() != null) {
+ LOG.error("Failed to update the final state of application"
+ + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
+ ExitUtil.terminate(1, storeEvent.getUpdatedException());
+ }
+
+ if (app.transitionTodo instanceof SingleArcTransition) {
+ ((SingleArcTransition) app.transitionTodo).transition(app,
+ app.eventCausingFinalSaving);
+ } else if (app.transitionTodo instanceof MultipleArcTransition) {
+ ((MultipleArcTransition) app.transitionTodo).transition(app,
+ app.eventCausingFinalSaving);
+ }
+ return app.targetedFinalState;
+
+ }
+ }
+
+ private static class AttemptFailedFinalStateSavedTransition extends
+ RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- if (event.getType().equals(RMAppEventType.APP_REMOVED)) {
- RMAppRemovedEvent removeEvent = (RMAppRemovedEvent) event;
- if (removeEvent.getRemovedException() != null) {
- LOG.error(
- "Failed to remove application: " + removeEvent.getApplicationId(),
- removeEvent.getRemovedException());
- ExitUtil.terminate(1, removeEvent.getRemovedException());
- }
+ String msg = null;
+ if (event instanceof RMAppFailedAttemptEvent) {
+ msg = app.getAppAttemptFailedDiagnostics(event);
}
- app.finishTime = System.currentTimeMillis();
+ LOG.info(msg);
+ app.diagnostics.append(msg);
+ // Inform the node for app-finish
+ FINAL_TRANSITION.transition(app, event);
+ }
+ }
+
+ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
+ String msg = null;
+ RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+ if (this.submissionContext.getUnmanagedAM()) {
+ // RM does not manage the AM. Do not retry
+ msg = "Unmanaged application " + this.getApplicationId()
+ + " failed due to " + failedEvent.getDiagnostics()
+ + ". Failing the application.";
+ } else if (this.attempts.size() >= this.maxAppAttempts) {
+ msg = "Application " + this.getApplicationId() + " failed "
+ + this.maxAppAttempts + " times due to "
+ + failedEvent.getDiagnostics() + ". Failing the application.";
}
+ return msg;
}
private static final class RMAppSavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
+
// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
// communication
LOG.info("Storing application with id " + app.applicationId);
- app.rmContext.getStateStore().storeApplication(app);
+ app.rmContext.getStateStore().storeNewApplication(app);
}
}
- private static final class RMAppRemovingTransition extends RMAppTransition {
+ private void rememberTargetTransitions(RMAppEvent event,
+ Object transitionToDo, RMAppState targetFinalState) {
+ transitionTodo = transitionToDo;
+ targetedFinalState = targetFinalState;
+ eventCausingFinalSaving = event;
+ }
+
+ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
+ Object transitionToDo, RMAppState targetFinalState,
+ RMAppState stateToBeStored) {
+ rememberTargetTransitions(event, transitionToDo, targetFinalState);
+ this.stateBeforeFinalSaving = getState();
+ this.storedFinishTime = System.currentTimeMillis();
+
+ LOG.info("Updating application " + this.applicationId
+ + " with final state: " + this.targetedFinalState);
+ // we lost attempt_finished diagnostics in app, because attempt_finished
+ // diagnostics is sent after app final state is saved. Later on, we will
+ // create GetApplicationAttemptReport specifically for getting per attempt
+ // info.
+ String diags = null;
+ switch (event.getType()) {
+ case APP_REJECTED:
+ RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent) event;
+ diags = rejectedEvent.getMessage();
+ break;
+ case ATTEMPT_FINISHED:
+ RMAppFinishedAttemptEvent finishedEvent =
+ (RMAppFinishedAttemptEvent) event;
+ diags = finishedEvent.getDiagnostics();
+ break;
+ case ATTEMPT_FAILED:
+ RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+ diags = getAppAttemptFailedDiagnostics(failedEvent);
+ break;
+ case KILL:
+ diags = getAppKilledDiagnostics();
+ break;
+ default:
+ break;
+ }
+ ApplicationState appState =
+ new ApplicationState(this.submitTime, this.startTime,
+ this.submissionContext, this.user, stateToBeStored, diags,
+ this.storedFinishTime);
+ this.rmContext.getStateStore().updateApplicationState(appState);
+ }
+
+ private static final class FinalSavingTransition extends RMAppTransition {
+ Object transitionToDo;
+ RMAppState targetedFinalState;
+ RMAppState stateToBeStored;
+
+ public FinalSavingTransition(Object transitionToDo,
+ RMAppState targetedFinalState) {
+ this(transitionToDo, targetedFinalState, targetedFinalState);
+ }
+
+ public FinalSavingTransition(Object transitionToDo,
+ RMAppState targetedFinalState, RMAppState stateToBeStored) {
+ this.transitionToDo = transitionToDo;
+ this.targetedFinalState = targetedFinalState;
+ this.stateToBeStored = stateToBeStored;
+ }
+
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- LOG.info("Removing application with id " + app.applicationId);
- app.removeApplicationState();
- app.previousStateAtRemoving = app.getState();
+ app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
+ targetedFinalState, stateToBeStored);
+ }
+ }
+
+ private static class AttemptUnregisteredTransition extends RMAppTransition {
+ @Override
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ app.finishTime = app.storedFinishTime;
}
}
@@ -698,6 +856,40 @@ public class RMAppImpl implements RMApp,
};
}
+ private static class AttemptFinishedAtFinalSavingTransition extends
+ RMAppTransition {
+ @Override
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ if (app.targetedFinalState.equals(RMAppState.FAILED)
+ || app.targetedFinalState.equals(RMAppState.KILLED)) {
+ // Ignore Attempt_Finished event if we were supposed to reach FAILED
+ // FINISHED state
+ return;
+ }
+
+ // pass in the earlier attempt_unregistered event, as it is needed in
+ // AppFinishedFinalStateSavedTransition later on
+ app.rememberTargetTransitions(event,
+ new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving),
+ RMAppState.FINISHED);
+ };
+ }
+
+ private static class AppFinishedFinalStateSavedTransition extends
+ RMAppTransition {
+ RMAppEvent attemptUnregistered;
+
+ public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) {
+ this.attemptUnregistered = attemptUnregistered;
+ }
+ @Override
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ new AttemptUnregisteredTransition().transition(app, attemptUnregistered);
+ FINISHED_TRANSITION.transition(app, event);
+ };
+ }
+
+
private static class AppKilledTransition extends FinalTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
@@ -706,6 +898,10 @@ public class RMAppImpl implements RMApp,
};
}
+ private static String getAppKilledDiagnostics() {
+ return "Application killed by user.";
+ }
+
private static class KillAppAndAttemptTransition extends AppKilledTransition {
@SuppressWarnings("unchecked")
@Override
@@ -741,12 +937,10 @@ public class RMAppImpl implements RMApp,
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
- if (app.getState() != RMAppState.FINISHING) {
+ app.finishTime = app.storedFinishTime;
+ if (app.finishTime == 0 ) {
app.finishTime = System.currentTimeMillis();
}
- // application completely done and remove from state store.
- app.removeApplicationState();
-
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
@@ -764,32 +958,15 @@ public class RMAppImpl implements RMApp,
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-
- RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event);
- boolean retryApp = true;
- String msg = null;
- if (app.submissionContext.getUnmanagedAM()) {
- // RM does not manage the AM. Do not retry
- retryApp = false;
- msg = "Unmanaged application " + app.getApplicationId()
- + " failed due to " + failedEvent.getDiagnostics()
- + ". Failing the application.";
- } else if (app.attempts.size() >= app.maxAppAttempts) {
- retryApp = false;
- msg = "Application " + app.getApplicationId() + " failed "
- + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics()
- + ". Failing the application.";
- }
-
- if (retryApp) {
+ if (!app.submissionContext.getUnmanagedAM()
+ && app.attempts.size() < app.maxAppAttempts) {
app.createNewAttempt(true);
return initialState;
} else {
- LOG.info(msg);
- app.diagnostics.append(msg);
- // Inform the node for app-finish
- FINAL_TRANSITION.transition(app, event);
- return RMAppState.FAILED;
+ app.rememberTargetTransitionsAndStoreState(event,
+ new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
+ RMAppState.FAILED);
+ return RMAppState.FINAL_SAVING;
}
}
@@ -814,9 +991,9 @@ public class RMAppImpl implements RMApp,
@Override
public YarnApplicationState createApplicationState() {
RMAppState rmAppState = getState();
- // If App is in REMOVING state, return its previous state.
- if (rmAppState.equals(RMAppState.REMOVING)) {
- rmAppState = previousStateAtRemoving;
+ // If App is in FINAL_SAVING state, return its previous state.
+ if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
+ rmAppState = stateBeforeFinalSaving;
}
switch (rmAppState) {
case NEW:
@@ -840,11 +1017,4 @@ public class RMAppImpl implements RMApp,
throw new YarnRuntimeException("Unknown state passed!");
}
}
-
- private void removeApplicationState(){
- if (!isAppRemovalRequestSent) {
- rmContext.getStateStore().removeApplication(this);
- isAppRemovalRequestSent = true;
- }
- }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Sun Nov 10 20:09:09 2013
@@ -24,7 +24,7 @@ public enum RMAppState {
SUBMITTED,
ACCEPTED,
RUNNING,
- REMOVING,
+ FINAL_SAVING,
FINISHING,
FINISHED,
FAILED,
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Sun Nov 10 20:09:09 2013
@@ -41,7 +41,8 @@ public enum RMAppAttemptEventType {
CONTAINER_FINISHED,
// Source: RMStateStore
- ATTEMPT_SAVED,
+ ATTEMPT_NEW_SAVED,
+ ATTEMPT_UPDATE_SAVED,
// Source: Scheduler
APP_REJECTED,