You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [11/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -32,9 +32,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import com.google.common.annotations.VisibleForTesting;
@@ -43,6 +44,8 @@ import com.google.common.annotations.Vis
public class MemoryRMStateStore extends RMStateStore {
RMState state = new RMState();
+ private int epoch = 0;
+
@VisibleForTesting
public RMState getState() {
return state;
@@ -53,6 +56,13 @@ public class MemoryRMStateStore extends
}
@Override
+ public synchronized int getAndIncrementEpoch() throws Exception {
+ int currentEpoch = epoch;
+ epoch = epoch + 1;
+ return currentEpoch;
+ }
+
+ @Override
public synchronized RMState loadState() throws Exception {
// return a copy of the state to allow for modification of the real state
RMState returnState = new RMState();
@@ -63,6 +73,10 @@ public class MemoryRMStateStore extends
state.rmSecretManagerState.getTokenState());
returnState.rmSecretManagerState.dtSequenceNumber =
state.rmSecretManagerState.dtSequenceNumber;
+ returnState.amrmTokenSecretManagerState =
+ state.amrmTokenSecretManagerState == null ? null
+ : AMRMTokenSecretManagerState
+ .newInstance(state.amrmTokenSecretManagerState);
return returnState;
}
@@ -80,7 +94,7 @@ public class MemoryRMStateStore extends
@Override
public void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData)
+ ApplicationStateData appStateData)
throws Exception {
ApplicationState appState =
new ApplicationState(appStateData.getSubmitTime(),
@@ -92,7 +106,7 @@ public class MemoryRMStateStore extends
@Override
public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
ApplicationState updatedAppState =
new ApplicationState(appStateData.getSubmitTime(),
appStateData.getStartTime(),
@@ -112,7 +126,7 @@ public class MemoryRMStateStore extends
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData)
+ ApplicationAttemptStateData attemptStateData)
throws Exception {
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
@@ -137,7 +151,7 @@ public class MemoryRMStateStore extends
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData)
+ ApplicationAttemptStateData attemptStateData)
throws Exception {
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
@@ -152,7 +166,8 @@ public class MemoryRMStateStore extends
attemptStateData.getStartTime(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus());
+ attemptStateData.getFinalApplicationStatus(),
+ attemptStateData.getAMContainerExitStatus());
ApplicationState appState =
state.getApplicationState().get(
@@ -244,7 +259,7 @@ public class MemoryRMStateStore extends
}
@Override
- protected RMStateVersion loadVersion() throws Exception {
+ protected Version loadVersion() throws Exception {
return null;
}
@@ -253,8 +268,22 @@ public class MemoryRMStateStore extends
}
@Override
- protected RMStateVersion getCurrentVersion() {
+ protected Version getCurrentVersion() {
return null;
}
+ @Override
+ public void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+ boolean isUpdate) {
+ if (amrmTokenSecretManagerState != null) {
+ state.amrmTokenSecretManagerState = AMRMTokenSecretManagerState
+ .newInstance(amrmTokenSecretManagerState);
+ }
+ }
+
+ @Override
+ public void deleteStore() throws Exception {
+ }
+
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -25,9 +25,10 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@Unstable
public class NullRMStateStore extends RMStateStore {
@@ -48,19 +49,24 @@ public class NullRMStateStore extends RM
}
@Override
+ public synchronized int getAndIncrementEpoch() throws Exception {
+ return 0;
+ }
+
+ @Override
public RMState loadState() throws Exception {
throw new UnsupportedOperationException("Cannot load state from null store");
}
@Override
protected void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
// Do nothing
}
@Override
protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ ApplicationAttemptStateData attemptStateData) throws Exception {
// Do nothing
}
@@ -102,13 +108,13 @@ public class NullRMStateStore extends RM
@Override
protected void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception {
+ ApplicationStateData appStateData) throws Exception {
// Do nothing
}
@Override
protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ ApplicationAttemptStateData attemptStateData) throws Exception {
}
@Override
@@ -117,7 +123,7 @@ public class NullRMStateStore extends RM
}
@Override
- protected RMStateVersion loadVersion() throws Exception {
+ protected Version loadVersion() throws Exception {
// Do nothing
return null;
}
@@ -128,9 +134,20 @@ public class NullRMStateStore extends RM
}
@Override
- protected RMStateVersion getCurrentVersion() {
+ protected Version getCurrentVersion() {
// Do nothing
return null;
}
+ @Override
+ public void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState state, boolean isUpdate) {
+ //DO Nothing
+ }
+
+ @Override
+ public void deleteStore() throws Exception {
+ // Do nothing
+ }
+
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -31,36 +30,39 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
@Private
@Unstable
@@ -79,12 +81,174 @@ public abstract class RMStateStore exten
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
+ protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
+ "AMRMTokenSecretManagerRoot";
protected static final String VERSION_NODE = "RMVersionNode";
+ protected static final String EPOCH_NODE = "EpochNode";
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
+ private enum RMStateStoreState {
+ DEFAULT
+ };
+
+ private static final StateMachineFactory<RMStateStore,
+ RMStateStoreState,
+ RMStateStoreEventType,
+ RMStateStoreEvent>
+ stateMachineFactory = new StateMachineFactory<RMStateStore,
+ RMStateStoreState,
+ RMStateStoreEventType,
+ RMStateStoreEvent>(
+ RMStateStoreState.DEFAULT)
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.STORE_APP, new StoreAppTransition())
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
+ .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
+
+ private final StateMachine<RMStateStoreState,
+ RMStateStoreEventType,
+ RMStateStoreEvent> stateMachine;
+
+ private static class StoreAppTransition
+ implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreAppEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
+ ApplicationId appId = appState.getAppId();
+ ApplicationStateData appStateData = ApplicationStateData
+ .newInstance(appState);
+ LOG.info("Storing info for app: " + appId);
+ try {
+ store.storeApplicationStateInternal(appId, appStateData);
+ store.notifyApplication(new RMAppEvent(appId,
+ RMAppEventType.APP_NEW_SAVED));
+ } catch (Exception e) {
+ LOG.error("Error storing app: " + appId, e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
+ private static class UpdateAppTransition implements
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateUpdateAppEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState();
+ ApplicationId appId = appState.getAppId();
+ ApplicationStateData appStateData = ApplicationStateData
+ .newInstance(appState);
+ LOG.info("Updating info for app: " + appId);
+ try {
+ store.updateApplicationStateInternal(appId, appStateData);
+ store.notifyApplication(new RMAppEvent(appId,
+ RMAppEventType.APP_UPDATE_SAVED));
+ } catch (Exception e) {
+ LOG.error("Error updating app: " + appId, e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
+ private static class RemoveAppTransition implements
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreRemoveAppEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationState appState = ((RMStateStoreRemoveAppEvent) event)
+ .getAppState();
+ ApplicationId appId = appState.getAppId();
+ LOG.info("Removing info for app: " + appId);
+ try {
+ store.removeApplicationStateInternal(appState);
+ } catch (Exception e) {
+ LOG.error("Error removing app: " + appId, e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
+ private static class StoreAppAttemptTransition implements
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreAppAttemptEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationAttemptState attemptState =
+ ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+ try {
+ ApplicationAttemptStateData attemptStateData =
+ ApplicationAttemptStateData.newInstance(attemptState);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+ }
+ store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+ attemptStateData);
+ store.notifyApplicationAttempt(new RMAppAttemptEvent
+ (attemptState.getAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
+ } catch (Exception e) {
+ LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
+ private static class UpdateAppAttemptTransition implements
+ SingleArcTransition<RMStateStore, RMStateStoreEvent> {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ ApplicationAttemptState attemptState =
+ ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
+ try {
+ ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData
+ .newInstance(attemptState);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating info for attempt: " + attemptState.getAttemptId());
+ }
+ store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+ attemptStateData);
+ store.notifyApplicationAttempt(new RMAppAttemptEvent
+ (attemptState.getAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
+ } catch (Exception e) {
+ LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
+ store.notifyStoreOperationFailed(e);
+ }
+ };
+ }
+
public RMStateStore() {
super(RMStateStore.class.getName());
+ stateMachine = stateMachineFactory.make(this);
}
/**
@@ -99,19 +263,21 @@ public abstract class RMStateStore exten
RMAppAttemptState state;
String finalTrackingUrl = "N/A";
String diagnostics;
+ int exitStatus = ContainerExitStatus.INVALID;
FinalApplicationStatus amUnregisteredFinalStatus;
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime) {
this(attemptId, masterContainer, appAttemptCredentials, startTime, null,
- null, "", null);
+ null, "", null, ContainerExitStatus.INVALID);
}
public ApplicationAttemptState(ApplicationAttemptId attemptId,
Container masterContainer, Credentials appAttemptCredentials,
long startTime, RMAppAttemptState state, String finalTrackingUrl,
- String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) {
+ String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus,
+ int exitStatus) {
this.attemptId = attemptId;
this.masterContainer = masterContainer;
this.appAttemptCredentials = appAttemptCredentials;
@@ -120,6 +286,7 @@ public abstract class RMStateStore exten
this.finalTrackingUrl = finalTrackingUrl;
this.diagnostics = diagnostics == null ? "" : diagnostics;
this.amUnregisteredFinalStatus = amUnregisteredFinalStatus;
+ this.exitStatus = exitStatus;
}
public Container getMasterContainer() {
@@ -146,6 +313,9 @@ public abstract class RMStateStore exten
public FinalApplicationStatus getFinalApplicationStatus() {
return amUnregisteredFinalStatus;
}
+ public int getAMContainerExitStatus(){
+ return this.exitStatus;
+ }
}
/**
@@ -244,6 +414,8 @@ public abstract class RMStateStore exten
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
+
public Map<ApplicationId, ApplicationState> getApplicationState() {
return appState;
}
@@ -251,6 +423,10 @@ public abstract class RMStateStore exten
public RMDTSecretManagerState getRMDTSecretManagerState() {
return rmSecretManagerState;
}
+
+ public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
+ return amrmTokenSecretManagerState;
+ }
}
private Dispatcher rmDispatcher;
@@ -319,14 +495,14 @@ public abstract class RMStateStore exten
* upgrade RM state.
*/
public void checkVersion() throws Exception {
- RMStateVersion loadedVersion = loadVersion();
+ Version loadedVersion = loadVersion();
LOG.info("Loaded RM state version info " + loadedVersion);
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
return;
}
// if there is no version info, treat it as 1.0;
if (loadedVersion == null) {
- loadedVersion = RMStateVersion.newInstance(1, 0);
+ loadedVersion = Version.newInstance(1, 0);
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing RM state version info " + getCurrentVersion());
@@ -342,7 +518,7 @@ public abstract class RMStateStore exten
* Derived class use this method to load the version information from state
* store.
*/
- protected abstract RMStateVersion loadVersion() throws Exception;
+ protected abstract Version loadVersion() throws Exception;
/**
* Derived class use this method to store the version information.
@@ -352,8 +528,14 @@ public abstract class RMStateStore exten
/**
* Get the current version of the underlying state store.
*/
- protected abstract RMStateVersion getCurrentVersion();
+ protected abstract Version getCurrentVersion();
+
+ /**
+ * Get the current epoch of RM and increment the value.
+ */
+ public abstract int getAndIncrementEpoch() throws Exception;
+
/**
* Blocking API
* The derived class must recover state from the store and return a new
@@ -390,10 +572,10 @@ public abstract class RMStateStore exten
* application.
*/
protected abstract void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception;
+ ApplicationStateData appStateData) throws Exception;
protected abstract void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateData) throws Exception;
+ ApplicationStateData appStateData) throws Exception;
@SuppressWarnings("unchecked")
/**
@@ -428,11 +610,11 @@ public abstract class RMStateStore exten
*/
protected abstract void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+ ApplicationAttemptStateData attemptStateData) throws Exception;
protected abstract void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
+ ApplicationAttemptStateData attemptStateData) throws Exception;
/**
* RMDTSecretManager call this to store the state of a delegation token
@@ -540,6 +722,14 @@ public abstract class RMStateStore exten
throws Exception;
/**
+ * Blocking API Derived classes must implement this method to store or update
+ * the state of AMRMToken Master Key
+ */
+ public abstract void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+ boolean isUpdate);
+
+ /**
* Non-blocking API
* ResourceManager services call this to remove an application from the state
* store
@@ -581,10 +771,7 @@ public abstract class RMStateStore exten
public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
- Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
- if(appToken != null){
- credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
- }
+
SecretKey clientTokenMasterKey =
appAttempt.getClientTokenMasterKey();
if(clientTokenMasterKey != null){
@@ -596,105 +783,10 @@ public abstract class RMStateStore exten
// Dispatcher related code
protected void handleStoreEvent(RMStateStoreEvent event) {
- if (event.getType().equals(RMStateStoreEventType.STORE_APP)
- || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
- ApplicationState appState = null;
- if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
- appState = ((RMStateStoreAppEvent) event).getAppState();
- } else {
- assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
- appState = ((RMStateUpdateAppEvent) event).getAppState();
- }
-
- Exception storedException = null;
- ApplicationStateDataPBImpl appStateData =
- (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
- .newApplicationStateData(appState.getSubmitTime(),
- appState.getStartTime(), appState.getUser(),
- appState.getApplicationSubmissionContext(), appState.getState(),
- appState.getDiagnostics(), appState.getFinishTime());
-
- ApplicationId appId =
- appState.getApplicationSubmissionContext().getApplicationId();
-
- LOG.info("Storing info for app: " + appId);
- try {
- if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
- storeApplicationStateInternal(appId, appStateData);
- notifyDoneStoringApplication(appId, storedException);
- } else {
- assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
- updateApplicationStateInternal(appId, appStateData);
- notifyDoneUpdatingApplication(appId, storedException);
- }
- } catch (Exception e) {
- LOG.error("Error storing/updating app: " + appId, e);
- notifyStoreOperationFailed(e);
- }
- } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
- || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
-
- ApplicationAttemptState attemptState = null;
- if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
- attemptState =
- ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
- } else {
- assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT);
- attemptState =
- ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
- }
-
- Exception storedException = null;
- Credentials credentials = attemptState.getAppAttemptCredentials();
- ByteBuffer appAttemptTokens = null;
- try {
- if (credentials != null) {
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- }
- ApplicationAttemptStateDataPBImpl attemptStateData =
- (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
- .newApplicationAttemptStateData(attemptState.getAttemptId(),
- attemptState.getMasterContainer(), appAttemptTokens,
- attemptState.getStartTime(), attemptState.getState(),
- attemptState.getFinalTrackingUrl(),
- attemptState.getDiagnostics(),
- attemptState.getFinalApplicationStatus());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
- }
- if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
- storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
- attemptStateData);
- notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
- storedException);
- } else {
- assert event.getType().equals(
- RMStateStoreEventType.UPDATE_APP_ATTEMPT);
- updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
- attemptStateData);
- notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
- storedException);
- }
- } catch (Exception e) {
- LOG.error(
- "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e);
- notifyStoreOperationFailed(e);
- }
- } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
- ApplicationState appState =
- ((RMStateStoreRemoveAppEvent) event).getAppState();
- ApplicationId appId = appState.getAppId();
- LOG.info("Removing info for app: " + appId);
- try {
- removeApplicationStateInternal(appState);
- } catch (Exception e) {
- LOG.error("Error removing app: " + appId, e);
- notifyStoreOperationFailed(e);
- }
- } else {
- LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+ try {
+ this.stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
}
}
@@ -713,47 +805,28 @@ public abstract class RMStateStore exten
}
rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
}
-
+
@SuppressWarnings("unchecked")
/**
- * In (@link handleStoreEvent}, this method is called to notify the
- * application that new application is stored in state store
- * @param appId id of the application that has been saved
- * @param storedException the exception that is thrown when storing the
- * application
- */
- private void notifyDoneStoringApplication(ApplicationId appId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppNewSavedEvent(appId, storedException));
- }
-
- @SuppressWarnings("unchecked")
- private void notifyDoneUpdatingApplication(ApplicationId appId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppUpdateSavedEvent(appId, storedException));
+ * This method is called to notify the application that
+ * new application is stored or updated in state store
+ * @param event App event containing the app id and event type
+ */
+ private void notifyApplication(RMAppEvent event) {
+ rmDispatcher.getEventHandler().handle(event);
}
-
+
@SuppressWarnings("unchecked")
/**
- * In (@link handleStoreEvent}, this method is called to notify the
- * application attempt that new attempt is stored in state store
- * @param appAttempt attempt that has been saved
- */
- private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppAttemptNewSavedEvent(attemptId, storedException));
- }
-
- @SuppressWarnings("unchecked")
- private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
- Exception updatedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
+ * This method is called to notify the application attempt
+ * that new attempt is stored or updated in state store
+ * @param event App attempt event containing the app attempt
+ * id and event type
+ */
+ private void notifyApplicationAttempt(RMAppAttemptEvent event) {
+ rmDispatcher.getEventHandler().handle(event);
}
-
+
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
@@ -766,4 +839,10 @@ public abstract class RMStateStore exten
handleStoreEvent(event);
}
}
+
+ /**
+ * Derived classes must implement this method to delete the state store
+ * @throws Exception
+ */
+ public abstract void deleteStore() throws Exception;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -44,15 +44,23 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -70,6 +78,11 @@ import org.apache.zookeeper.server.auth.
import com.google.common.annotations.VisibleForTesting;
+/**
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
+ */
@Private
@Unstable
public class ZKRMStateStore extends RMStateStore {
@@ -78,8 +91,8 @@ public class ZKRMStateStore extends RMSt
private final SecureRandom random = new SecureRandom();
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
- protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
- .newInstance(1, 0);
+ protected static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(1, 2);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -90,7 +103,9 @@ public class ZKRMStateStore extends RMSt
private String zkHostPort = null;
private int zkSessionTimeout;
- private long zkRetryInterval;
+
+ @VisibleForTesting
+ long zkRetryInterval;
private List<ACL> zkAcl;
private List<ZKUtil.ZKAuthInfo> zkAuths;
@@ -98,6 +113,7 @@ public class ZKRMStateStore extends RMSt
*
* ROOT_DIR_PATH
* |--- VERSION_INFO
+ * |--- EPOCH_NODE
* |--- RM_ZK_FENCING_LOCK
* |--- RM_APP_ROOT
* | |----- (#ApplicationId1)
@@ -118,6 +134,9 @@ public class ZKRMStateStore extends RMSt
* | |----- Key_1
* | |----- Key_2
* ....
+ * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
+ * |----- currentMasterKey
+ * |----- nextMasterKey
*
*/
private String zkRootNodePath;
@@ -126,6 +145,7 @@ public class ZKRMStateStore extends RMSt
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
private String dtSequenceNumberPath;
+ private String amrmTokenSecretManagerRoot;
@VisibleForTesting
protected String znodeWorkingPath;
@@ -199,9 +219,14 @@ public class ZKRMStateStore extends RMSt
zkSessionTimeout =
conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
- zkRetryInterval =
- conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+
+ if (HAUtil.isHAEnabled(conf)) {
+ zkRetryInterval = zkSessionTimeout / numRetries;
+ } else {
+ zkRetryInterval =
+ conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+ }
zkAcl = RMZKUtils.getZKAcls(conf);
zkAuths = RMZKUtils.getZKAuths(conf);
@@ -240,6 +265,8 @@ public class ZKRMStateStore extends RMSt
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
+ amrmTokenSecretManagerRoot =
+ getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
}
@Override
@@ -260,24 +287,26 @@ public class ZKRMStateStore extends RMSt
createRootDir(dtMasterKeysRootPath);
createRootDir(delegationTokensRootPath);
createRootDir(dtSequenceNumberPath);
+ createRootDir(amrmTokenSecretManagerRoot);
}
private void createRootDir(final String rootPath) throws Exception {
// For root dirs, we shouldn't use the doMulti helper methods
- try {
- new ZKAction<String>() {
- @Override
- public String run() throws KeeperException, InterruptedException {
+ new ZKAction<String>() {
+ @Override
+ public String run() throws KeeperException, InterruptedException {
+ try {
return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+ } catch (KeeperException ke) {
+ if (ke.code() == Code.NODEEXISTS) {
+ LOG.debug(rootPath + "znode already exists!");
+ return null;
+ } else {
+ throw ke;
+ }
}
- }.runWithRetries();
- } catch (KeeperException ke) {
- if (ke.code() == Code.NODEEXISTS) {
- LOG.debug(rootPath + "znode already exists!");
- } else {
- throw ke;
}
- }
+ }.runWithRetries();
}
private void logRootNodeAcls(String prefix) throws Exception {
@@ -353,7 +382,7 @@ public class ZKRMStateStore extends RMSt
}
@Override
- protected RMStateVersion getCurrentVersion() {
+ protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@@ -361,7 +390,7 @@ public class ZKRMStateStore extends RMSt
protected synchronized void storeVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data =
- ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+ ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (existsWithRetries(versionNodePath, true) != null) {
setDataWithRetries(versionNodePath, data, -1);
} else {
@@ -370,28 +399,68 @@ public class ZKRMStateStore extends RMSt
}
@Override
- protected synchronized RMStateVersion loadVersion() throws Exception {
+ protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
if (existsWithRetries(versionNodePath, true) != null) {
byte[] data = getDataWithRetries(versionNodePath, true);
- RMStateVersion version =
- new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+ Version version =
+ new VersionPBImpl(VersionProto.parseFrom(data));
return version;
}
return null;
}
@Override
+ public synchronized int getAndIncrementEpoch() throws Exception {
+ String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
+ int currentEpoch = 0;
+ if (existsWithRetries(epochNodePath, true) != null) {
+ // load current epoch
+ byte[] data = getDataWithRetries(epochNodePath, true);
+ Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
+ currentEpoch = epoch.getEpoch();
+ // increment epoch and store it
+ byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ .toByteArray();
+ setDataWithRetries(epochNodePath, storeData, -1);
+ } else {
+ // initialize epoch node with 1 for the next time.
+ byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ .toByteArray();
+ createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
+ }
+ return currentEpoch;
+ }
+
+ @Override
public synchronized RMState loadState() throws Exception {
RMState rmState = new RMState();
// recover DelegationTokenSecretManager
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
+ // recover AMRMTokenSecretManager
+ loadAMRMTokenSecretManagerState(rmState);
return rmState;
}
+ private void loadAMRMTokenSecretManagerState(RMState rmState)
+ throws Exception {
+ byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true);
+ if (data == null) {
+ LOG.warn("There is no data saved");
+ return;
+ }
+ AMRMTokenSecretManagerStatePBImpl stateData =
+ new AMRMTokenSecretManagerStatePBImpl(
+ AMRMTokenSecretManagerStateProto.parseFrom(data));
+ rmState.amrmTokenSecretManagerState =
+ AMRMTokenSecretManagerState.newInstance(
+ stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
+
+ }
+
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
loadRMDelegationKeyState(rmState);
@@ -529,22 +598,22 @@ public class ZKRMStateStore extends RMSt
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(),
- attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
- attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus());
+ attemptStateData.getMasterContainer(), credentials,
+ attemptStateData.getStartTime(), attemptStateData.getState(),
+ attemptStateData.getFinalTrackingUrl(),
+ attemptStateData.getDiagnostics(),
+ attemptStateData.getFinalApplicationStatus(),
+ attemptStateData.getAMContainerExitStatus());
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
- LOG.info("Done Loading applications from ZK state store");
+ LOG.debug("Done Loading applications from ZK state store");
}
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ ApplicationStateData appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
@@ -558,7 +627,7 @@ public class ZKRMStateStore extends RMSt
@Override
public synchronized void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ ApplicationStateData appStateDataPB) throws Exception {
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
@@ -572,7 +641,7 @@ public class ZKRMStateStore extends RMSt
} else {
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
CreateMode.PERSISTENT);
- LOG.info(appId + " znode didn't exist. Created a new znode to"
+ LOG.debug(appId + " znode didn't exist. Created a new znode to"
+ " update the application state.");
}
}
@@ -580,7 +649,7 @@ public class ZKRMStateStore extends RMSt
@Override
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
String appDirPath = getNodePath(rmAppRoot,
appAttemptId.getApplicationId().toString());
@@ -598,7 +667,7 @@ public class ZKRMStateStore extends RMSt
@Override
public synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
String appIdStr = appAttemptId.getApplicationId().toString();
String appAttemptIdStr = appAttemptId.toString();
@@ -615,7 +684,7 @@ public class ZKRMStateStore extends RMSt
} else {
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
- LOG.info(appAttemptId + " znode didn't exist. Created a new znode to"
+ LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
}
}
@@ -664,7 +733,7 @@ public class ZKRMStateStore extends RMSt
if (existsWithRetries(nodeRemovePath, true) != null) {
opList.add(Op.delete(nodeRemovePath, -1));
} else {
- LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
}
doMultiWithRetries(opList);
}
@@ -681,7 +750,7 @@ public class ZKRMStateStore extends RMSt
// in case znode doesn't exist
addStoreOrUpdateOps(
opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
- LOG.info("Attempted to update a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
} else {
// in case znode exists
addStoreOrUpdateOps(
@@ -763,7 +832,14 @@ public class ZKRMStateStore extends RMSt
if (existsWithRetries(nodeRemovePath, true) != null) {
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
} else {
- LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
+ LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
+ }
+ }
+
+ @Override
+ public synchronized void deleteStore() throws Exception {
+ if (existsWithRetries(zkRootNodePath, true) != null) {
+ deleteWithRetries(zkRootNodePath, true);
}
}
@@ -816,7 +892,7 @@ public class ZKRMStateStore extends RMSt
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
- LOG.info("Session expired");
+ LOG.info("ZKRMStateStore Session expired");
createConnection();
break;
default:
@@ -921,6 +997,29 @@ public class ZKRMStateStore extends RMSt
}.runWithRetries();
}
+ private void deleteWithRetries(
+ final String path, final boolean watch) throws Exception {
+ new ZKAction<Void>() {
+ @Override
+ Void run() throws KeeperException, InterruptedException {
+ recursiveDeleteWithRetriesHelper(path, watch);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ /**
+ * Helper method that deletes znodes recursively
+ */
+ private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
+ throws KeeperException, InterruptedException {
+ List<String> children = zkClient.getChildren(path, watch);
+ for (String child : children) {
+ recursiveDeleteWithRetriesHelper(path + "/" + child, false);
+ }
+ zkClient.delete(path, -1);
+ }
+
/**
* Helper class that periodically attempts creating a znode to ensure that
* this RM continues to be the Active.
@@ -991,13 +1090,13 @@ public class ZKRMStateStore extends RMSt
throw new StoreFencedException();
}
} catch (KeeperException ke) {
+ LOG.info("Exception while executing a ZK operation.", ke);
if (shouldRetry(ke.code()) && ++retry < numRetries) {
- LOG.info("Waiting for zookeeper to be connected, retry no. + "
- + retry);
+ LOG.info("Retrying operation on ZK. Retry no. " + retry);
Thread.sleep(zkRetryInterval);
continue;
}
- LOG.debug("Error while doing ZK operation.", ke);
+ LOG.info("Maxed out ZK retries. Giving up!");
throw ke;
}
}
@@ -1044,4 +1143,19 @@ public class ZKRMStateStore extends RMSt
return zk;
}
+ @Override
+ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+ boolean isUpdate) {
+ AMRMTokenSecretManagerState data =
+ AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+ byte[] stateData = data.getProto().toByteArray();
+ try {
+ setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
+ } catch (Exception ex) {
+ LOG.info("Error storing info for AMRMTokenSecretManager", ex);
+ notifyStoreOperationFailed(ex);
+ }
+ }
+
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Tue Aug 19 23:49:39 2014
@@ -18,31 +18,74 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Records;
/*
* Contains the state data that needs to be persisted for an ApplicationAttempt
*/
@Public
@Unstable
-public interface ApplicationAttemptStateData {
-
+public abstract class ApplicationAttemptStateData {
+ public static ApplicationAttemptStateData newInstance(
+ ApplicationAttemptId attemptId, Container container,
+ ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
+ String finalTrackingUrl, String diagnostics,
+ FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) {
+ ApplicationAttemptStateData attemptStateData =
+ Records.newRecord(ApplicationAttemptStateData.class);
+ attemptStateData.setAttemptId(attemptId);
+ attemptStateData.setMasterContainer(container);
+ attemptStateData.setAppAttemptTokens(attemptTokens);
+ attemptStateData.setState(finalState);
+ attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
+ attemptStateData.setDiagnostics(diagnostics);
+ attemptStateData.setStartTime(startTime);
+ attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
+ attemptStateData.setAMContainerExitStatus(exitStatus);
+ return attemptStateData;
+ }
+
+ public static ApplicationAttemptStateData newInstance(
+ ApplicationAttemptState attemptState) throws IOException {
+ Credentials credentials = attemptState.getAppAttemptCredentials();
+ ByteBuffer appAttemptTokens = null;
+ if (credentials != null) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
+ return newInstance(attemptState.getAttemptId(),
+ attemptState.getMasterContainer(), appAttemptTokens,
+ attemptState.getStartTime(), attemptState.getState(),
+ attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(),
+ attemptState.getFinalApplicationStatus(),
+ attemptState.getAMContainerExitStatus());
+ }
+
+ public abstract ApplicationAttemptStateDataProto getProto();
+
/**
* The ApplicationAttemptId for the application attempt
* @return ApplicationAttemptId for the application attempt
*/
@Public
@Unstable
- public ApplicationAttemptId getAttemptId();
+ public abstract ApplicationAttemptId getAttemptId();
- public void setAttemptId(ApplicationAttemptId attemptId);
+ public abstract void setAttemptId(ApplicationAttemptId attemptId);
/*
* The master container running the application attempt
@@ -50,9 +93,9 @@ public interface ApplicationAttemptState
*/
@Public
@Unstable
- public Container getMasterContainer();
+ public abstract Container getMasterContainer();
- public void setMasterContainer(Container container);
+ public abstract void setMasterContainer(Container container);
/**
* The application attempt tokens that belong to this attempt
@@ -60,17 +103,17 @@ public interface ApplicationAttemptState
*/
@Public
@Unstable
- public ByteBuffer getAppAttemptTokens();
+ public abstract ByteBuffer getAppAttemptTokens();
- public void setAppAttemptTokens(ByteBuffer attemptTokens);
+ public abstract void setAppAttemptTokens(ByteBuffer attemptTokens);
/**
* Get the final state of the application attempt.
* @return the final state of the application attempt.
*/
- public RMAppAttemptState getState();
+ public abstract RMAppAttemptState getState();
- public void setState(RMAppAttemptState state);
+ public abstract void setState(RMAppAttemptState state);
/**
* Get the original not-proxied <em>final tracking url</em> for the
@@ -79,34 +122,39 @@ public interface ApplicationAttemptState
* @return the original not-proxied <em>final tracking url</em> for the
* application
*/
- public String getFinalTrackingUrl();
+ public abstract String getFinalTrackingUrl();
/**
* Set the final tracking Url of the AM.
* @param url
*/
- public void setFinalTrackingUrl(String url);
+ public abstract void setFinalTrackingUrl(String url);
/**
* Get the <em>diagnositic information</em> of the attempt
* @return <em>diagnositic information</em> of the attempt
*/
- public String getDiagnostics();
+ public abstract String getDiagnostics();
- public void setDiagnostics(String diagnostics);
+ public abstract void setDiagnostics(String diagnostics);
/**
* Get the <em>start time</em> of the application.
* @return <em>start time</em> of the application
*/
- public long getStartTime();
+ public abstract long getStartTime();
- public void setStartTime(long startTime);
+ public abstract void setStartTime(long startTime);
/**
* Get the <em>final finish status</em> of the application.
* @return <em>final finish status</em> of the application
*/
- public FinalApplicationStatus getFinalApplicationStatus();
+ public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+ public abstract void setFinalApplicationStatus(
+ FinalApplicationStatus finishState);
+
+ public abstract int getAMContainerExitStatus();
- public void setFinalApplicationStatus(FinalApplicationStatus finishState);
+ public abstract void setAMContainerExitStatus(int exitStatus);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.Records;
/**
* Contains all the state data that needs to be stored persistently
@@ -32,19 +35,43 @@ import org.apache.hadoop.yarn.server.res
*/
@Public
@Unstable
-public interface ApplicationStateData {
-
+public abstract class ApplicationStateData {
+ public static ApplicationStateData newInstance(long submitTime,
+ long startTime, String user,
+ ApplicationSubmissionContext submissionContext,
+ RMAppState state, String diagnostics, long finishTime) {
+ ApplicationStateData appState = Records.newRecord(ApplicationStateData.class);
+ appState.setSubmitTime(submitTime);
+ appState.setStartTime(startTime);
+ appState.setUser(user);
+ appState.setApplicationSubmissionContext(submissionContext);
+ appState.setState(state);
+ appState.setDiagnostics(diagnostics);
+ appState.setFinishTime(finishTime);
+ return appState;
+ }
+
+ public static ApplicationStateData newInstance(
+ ApplicationState appState) {
+ return newInstance(appState.getSubmitTime(), appState.getStartTime(),
+ appState.getUser(), appState.getApplicationSubmissionContext(),
+ appState.getState(), appState.getDiagnostics(),
+ appState.getFinishTime());
+ }
+
+ public abstract ApplicationStateDataProto getProto();
+
/**
* The time at which the application was received by the Resource Manager
* @return submitTime
*/
@Public
@Unstable
- public long getSubmitTime();
+ public abstract long getSubmitTime();
@Public
@Unstable
- public void setSubmitTime(long submitTime);
+ public abstract void setSubmitTime(long submitTime);
/**
* Get the <em>start time</em> of the application.
@@ -63,11 +90,11 @@ public interface ApplicationStateData {
*/
@Public
@Unstable
- public void setUser(String user);
+ public abstract void setUser(String user);
@Public
@Unstable
- public String getUser();
+ public abstract String getUser();
/**
* The {@link ApplicationSubmissionContext} for the application
@@ -76,34 +103,34 @@ public interface ApplicationStateData {
*/
@Public
@Unstable
- public ApplicationSubmissionContext getApplicationSubmissionContext();
+ public abstract ApplicationSubmissionContext getApplicationSubmissionContext();
@Public
@Unstable
- public void setApplicationSubmissionContext(
+ public abstract void setApplicationSubmissionContext(
ApplicationSubmissionContext context);
/**
* Get the final state of the application.
* @return the final state of the application.
*/
- public RMAppState getState();
+ public abstract RMAppState getState();
- public void setState(RMAppState state);
+ public abstract void setState(RMAppState state);
/**
* Get the diagnostics information for the application master.
* @return the diagnostics information for the application master.
*/
- public String getDiagnostics();
+ public abstract String getDiagnostics();
- public void setDiagnostics(String diagnostics);
+ public abstract void setDiagnostics(String diagnostics);
/**
* The finish time of the application.
* @return the finish time of the application.,
*/
- public long getFinishTime();
+ public abstract long getFinishTime();
- public void setFinishTime(long finishTime);
+ public abstract void setFinishTime(long finishTime);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Tue Aug 19 23:49:39 2014
@@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
@@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-public class ApplicationAttemptStateDataPBImpl
-extends ProtoBase<ApplicationAttemptStateDataProto>
-implements ApplicationAttemptStateData {
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
+public class ApplicationAttemptStateDataPBImpl extends
+ ApplicationAttemptStateData {
ApplicationAttemptStateDataProto proto =
ApplicationAttemptStateDataProto.getDefaultInstance();
ApplicationAttemptStateDataProto.Builder builder = null;
@@ -60,7 +55,8 @@ implements ApplicationAttemptStateData {
this.proto = proto;
viaProto = true;
}
-
+
+ @Override
public ApplicationAttemptStateDataProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -76,7 +72,8 @@ implements ApplicationAttemptStateData {
builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto());
}
if(this.appAttemptTokens != null) {
- builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens));
+ builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat(
+ this.appAttemptTokens));
}
}
@@ -148,7 +145,8 @@ implements ApplicationAttemptStateData {
if(!p.hasAppAttemptTokens()) {
return null;
}
- this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens());
+ this.appAttemptTokens = ProtoUtils.convertFromProtoFormat(
+ p.getAppAttemptTokens());
return appAttemptTokens;
}
@@ -249,24 +247,39 @@ implements ApplicationAttemptStateData {
builder.setFinalApplicationStatus(convertToProtoFormat(finishState));
}
- public static ApplicationAttemptStateData newApplicationAttemptStateData(
- ApplicationAttemptId attemptId, Container container,
- ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState,
- String finalTrackingUrl, String diagnostics,
- FinalApplicationStatus amUnregisteredFinalStatus) {
- ApplicationAttemptStateData attemptStateData =
- recordFactory.newRecordInstance(ApplicationAttemptStateData.class);
- attemptStateData.setAttemptId(attemptId);
- attemptStateData.setMasterContainer(container);
- attemptStateData.setAppAttemptTokens(attemptTokens);
- attemptStateData.setState(finalState);
- attemptStateData.setFinalTrackingUrl(finalTrackingUrl);
- attemptStateData.setDiagnostics(diagnostics);
- attemptStateData.setStartTime(startTime);
- attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus);
- return attemptStateData;
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
}
+ @Override
+ public int getAMContainerExitStatus() {
+ ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getAmContainerExitStatus();
+ }
+
+ @Override
+ public void setAMContainerExitStatus(int exitStatus) {
+ maybeInitBuilder();
+ builder.setAmContainerExitStatus(exitStatus);
+ }
+
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_";
public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) {
return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name());
@@ -281,5 +294,4 @@ implements ApplicationAttemptStateData {
private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) {
return ProtoUtils.convertFromProtoFormat(s);
}
-
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Tue Aug 19 23:49:39 2014
@@ -20,21 +20,15 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-public class ApplicationStateDataPBImpl
-extends ProtoBase<ApplicationStateDataProto>
-implements ApplicationStateData {
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
+import com.google.protobuf.TextFormat;
+public class ApplicationStateDataPBImpl extends ApplicationStateData {
ApplicationStateDataProto proto =
ApplicationStateDataProto.getDefaultInstance();
ApplicationStateDataProto.Builder builder = null;
@@ -51,7 +45,8 @@ implements ApplicationStateData {
this.proto = proto;
viaProto = true;
}
-
+
+ @Override
public ApplicationStateDataProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -136,7 +131,7 @@ implements ApplicationStateData {
}
applicationSubmissionContext =
new ApplicationSubmissionContextPBImpl(
- p.getApplicationSubmissionContext());
+ p.getApplicationSubmissionContext());
return applicationSubmissionContext;
}
@@ -200,21 +195,24 @@ implements ApplicationStateData {
builder.setFinishTime(finishTime);
}
- public static ApplicationStateData newApplicationStateData(long submitTime,
- long startTime, String user,
- ApplicationSubmissionContext submissionContext, RMAppState state,
- String diagnostics, long finishTime) {
-
- ApplicationStateData appState =
- recordFactory.newRecordInstance(ApplicationStateData.class);
- appState.setSubmitTime(submitTime);
- appState.setStartTime(startTime);
- appState.setUser(user);
- appState.setApplicationSubmissionContext(submissionContext);
- appState.setState(state);
- appState.setDiagnostics(diagnostics);
- appState.setFinishTime(finishTime);
- return appState;
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
}
private static String RM_APP_PREFIX = "RMAPP_";
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Tue Aug 19 23:49:39 2014
@@ -19,16 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
-
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -208,6 +208,14 @@ public interface RMApp extends EventHand
* @return the flag indicating whether the applications's state is stored.
*/
boolean isAppFinalStateStored();
+
+
+ /**
+ * Nodes on which the containers for this {@link RMApp} ran.
+ * @return the set of nodes that ran any containers from this {@link RMApp}
+ * Add more node on which containers for this {@link RMApp} ran
+ */
+ Set<NodeId> getRanNodes();
/**
* Create the external user-facing state of ApplicationMaster from the
@@ -215,4 +223,11 @@ public interface RMApp extends EventHand
* @return the external user-facing state of ApplicationMaster.
*/
YarnApplicationState createApplicationState();
+
+ /**
+ * Get RMAppMetrics of the {@link RMApp}.
+ *
+ * @return metrics
+ */
+ RMAppMetrics getRMAppMetrics();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Tue Aug 19 23:49:39 2014
@@ -38,6 +38,9 @@ public enum RMAppEventType {
ATTEMPT_FAILED,
ATTEMPT_KILLED,
NODE_UPDATE,
+
+ // Source: Container and ResourceTracker
+ APP_RUNNING_ON_NODE,
// Source: RMStateStore
APP_NEW_SAVED,