You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:59 UTC
svn commit: r1619019 [6/10] - in
/hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/
hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/had...
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Wed Aug 20 01:34:29 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -45,24 +44,21 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
@@ -85,6 +81,8 @@ public abstract class RMStateStore exten
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
+ protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
+ "AMRMTokenSecretManagerRoot";
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
@@ -134,7 +132,8 @@ public abstract class RMStateStore exten
LOG.info("Storing info for app: " + appId);
try {
store.storeApplicationStateInternal(appId, appStateData);
- store.notifyDoneStoringApplication(appId, null);
+ store.notifyApplication(new RMAppEvent(appId,
+ RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
store.notifyStoreOperationFailed(e);
@@ -158,7 +157,8 @@ public abstract class RMStateStore exten
LOG.info("Updating info for app: " + appId);
try {
store.updateApplicationStateInternal(appId, appStateData);
- store.notifyDoneUpdatingApplication(appId, null);
+ store.notifyApplication(new RMAppEvent(appId,
+ RMAppEventType.APP_UPDATE_SAVED));
} catch (Exception e) {
LOG.error("Error updating app: " + appId, e);
store.notifyStoreOperationFailed(e);
@@ -207,8 +207,9 @@ public abstract class RMStateStore exten
}
store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
- store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
- null);
+ store.notifyApplicationAttempt(new RMAppAttemptEvent
+ (attemptState.getAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
store.notifyStoreOperationFailed(e);
@@ -235,8 +236,9 @@ public abstract class RMStateStore exten
}
store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
- store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
- null);
+ store.notifyApplicationAttempt(new RMAppAttemptEvent
+ (attemptState.getAttemptId(),
+ RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
} catch (Exception e) {
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
store.notifyStoreOperationFailed(e);
@@ -412,6 +414,8 @@ public abstract class RMStateStore exten
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
+
public Map<ApplicationId, ApplicationState> getApplicationState() {
return appState;
}
@@ -419,6 +423,10 @@ public abstract class RMStateStore exten
public RMDTSecretManagerState getRMDTSecretManagerState() {
return rmSecretManagerState;
}
+
+ public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
+ return amrmTokenSecretManagerState;
+ }
}
private Dispatcher rmDispatcher;
@@ -487,14 +495,14 @@ public abstract class RMStateStore exten
* upgrade RM state.
*/
public void checkVersion() throws Exception {
- RMStateVersion loadedVersion = loadVersion();
+ Version loadedVersion = loadVersion();
LOG.info("Loaded RM state version info " + loadedVersion);
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
return;
}
// if there is no version info, treat it as 1.0;
if (loadedVersion == null) {
- loadedVersion = RMStateVersion.newInstance(1, 0);
+ loadedVersion = Version.newInstance(1, 0);
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing RM state version info " + getCurrentVersion());
@@ -510,7 +518,7 @@ public abstract class RMStateStore exten
* Derived class use this method to load the version information from state
* store.
*/
- protected abstract RMStateVersion loadVersion() throws Exception;
+ protected abstract Version loadVersion() throws Exception;
/**
* Derived class use this method to store the version information.
@@ -520,7 +528,7 @@ public abstract class RMStateStore exten
/**
* Get the current version of the underlying state store.
*/
- protected abstract RMStateVersion getCurrentVersion();
+ protected abstract Version getCurrentVersion();
/**
@@ -714,6 +722,14 @@ public abstract class RMStateStore exten
throws Exception;
/**
+ * Blocking API Derived classes must implement this method to store or update
+ * the state of AMRMToken Master Key
+ */
+ public abstract void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+ boolean isUpdate);
+
+ /**
* Non-blocking API
* ResourceManager services call this to remove an application from the state
* store
@@ -755,10 +771,7 @@ public abstract class RMStateStore exten
public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
Credentials credentials = new Credentials();
- Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
- if(appToken != null){
- credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
- }
+
SecretKey clientTokenMasterKey =
appAttempt.getClientTokenMasterKey();
if(clientTokenMasterKey != null){
@@ -792,47 +805,28 @@ public abstract class RMStateStore exten
}
rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
}
-
+
@SuppressWarnings("unchecked")
/**
- * In (@link handleStoreEvent}, this method is called to notify the
- * application that new application is stored in state store
- * @param appId id of the application that has been saved
- * @param storedException the exception that is thrown when storing the
- * application
- */
- private void notifyDoneStoringApplication(ApplicationId appId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppNewSavedEvent(appId, storedException));
- }
-
- @SuppressWarnings("unchecked")
- private void notifyDoneUpdatingApplication(ApplicationId appId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppUpdateSavedEvent(appId, storedException));
+ * This method is called to notify the application that
+ * new application is stored or updated in state store
+ * @param event App event containing the app id and event type
+ */
+ private void notifyApplication(RMAppEvent event) {
+ rmDispatcher.getEventHandler().handle(event);
}
-
+
@SuppressWarnings("unchecked")
/**
- * In (@link handleStoreEvent}, this method is called to notify the
- * application attempt that new attempt is stored in state store
- * @param appAttempt attempt that has been saved
- */
- private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
- Exception storedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppAttemptNewSavedEvent(attemptId, storedException));
- }
-
- @SuppressWarnings("unchecked")
- private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
- Exception updatedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
+ * This method is called to notify the application attempt
+ * that new attempt is stored or updated in state store
+ * @param event App attempt event containing the app attempt
+ * id and event type
+ */
+ private void notifyApplicationAttempt(RMAppAttemptEvent event) {
+ rmDispatcher.getEventHandler().handle(event);
}
-
+
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Wed Aug 20 01:34:29 2014
@@ -44,22 +44,23 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
-
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -77,6 +78,11 @@ import org.apache.zookeeper.server.auth.
import com.google.common.annotations.VisibleForTesting;
+/**
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
+ */
@Private
@Unstable
public class ZKRMStateStore extends RMStateStore {
@@ -85,8 +91,8 @@ public class ZKRMStateStore extends RMSt
private final SecureRandom random = new SecureRandom();
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
- protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
- .newInstance(1, 1);
+ protected static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(1, 2);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -128,6 +134,9 @@ public class ZKRMStateStore extends RMSt
* | |----- Key_1
* | |----- Key_2
* ....
+ * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
+ * |----- currentMasterKey
+ * |----- nextMasterKey
*
*/
private String zkRootNodePath;
@@ -136,6 +145,7 @@ public class ZKRMStateStore extends RMSt
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
private String dtSequenceNumberPath;
+ private String amrmTokenSecretManagerRoot;
@VisibleForTesting
protected String znodeWorkingPath;
@@ -255,6 +265,8 @@ public class ZKRMStateStore extends RMSt
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
+ amrmTokenSecretManagerRoot =
+ getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
}
@Override
@@ -275,6 +287,7 @@ public class ZKRMStateStore extends RMSt
createRootDir(dtMasterKeysRootPath);
createRootDir(delegationTokensRootPath);
createRootDir(dtSequenceNumberPath);
+ createRootDir(amrmTokenSecretManagerRoot);
}
private void createRootDir(final String rootPath) throws Exception {
@@ -369,7 +382,7 @@ public class ZKRMStateStore extends RMSt
}
@Override
- protected RMStateVersion getCurrentVersion() {
+ protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
@@ -377,7 +390,7 @@ public class ZKRMStateStore extends RMSt
protected synchronized void storeVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data =
- ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+ ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (existsWithRetries(versionNodePath, true) != null) {
setDataWithRetries(versionNodePath, data, -1);
} else {
@@ -386,13 +399,13 @@ public class ZKRMStateStore extends RMSt
}
@Override
- protected synchronized RMStateVersion loadVersion() throws Exception {
+ protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
if (existsWithRetries(versionNodePath, true) != null) {
byte[] data = getDataWithRetries(versionNodePath, true);
- RMStateVersion version =
- new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+ Version version =
+ new VersionPBImpl(VersionProto.parseFrom(data));
return version;
}
return null;
@@ -427,9 +440,27 @@ public class ZKRMStateStore extends RMSt
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
+ // recover AMRMTokenSecretManager
+ loadAMRMTokenSecretManagerState(rmState);
return rmState;
}
+ private void loadAMRMTokenSecretManagerState(RMState rmState)
+ throws Exception {
+ byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true);
+ if (data == null) {
+ LOG.warn("There is no data saved");
+ return;
+ }
+ AMRMTokenSecretManagerStatePBImpl stateData =
+ new AMRMTokenSecretManagerStatePBImpl(
+ AMRMTokenSecretManagerStateProto.parseFrom(data));
+ rmState.amrmTokenSecretManagerState =
+ AMRMTokenSecretManagerState.newInstance(
+ stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
+
+ }
+
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
loadRMDelegationKeyState(rmState);
@@ -1112,4 +1143,19 @@ public class ZKRMStateStore extends RMSt
return zk;
}
+ @Override
+ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+ boolean isUpdate) {
+ AMRMTokenSecretManagerState data =
+ AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+ byte[] stateData = data.getProto().toByteArray();
+ try {
+ setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
+ } catch (Exception ex) {
+ LOG.info("Error storing info for AMRMTokenSecretManager", ex);
+ notifyStoreOperationFailed(ex);
+ }
+ }
+
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Aug 20 01:34:29 2014
@@ -166,6 +166,8 @@ public class RMAppImpl implements RMApp,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
@@ -243,7 +245,7 @@ public class RMAppImpl implements RMApp,
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
- RMAppEventType.APP_NEW_SAVED))
+ RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@@ -254,9 +256,9 @@ public class RMAppImpl implements RMApp,
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
- // ignore Kill as we have already saved the final Finished state in
- // state store.
- RMAppEventType.KILL))
+ // ignore Kill/Move as we have already saved the final Finished state
+ // in state store.
+ RMAppEventType.KILL, RMAppEventType.MOVE))
// Transitions from KILLING state
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
@@ -274,7 +276,7 @@ public class RMAppImpl implements RMApp,
RMAppEventType.ATTEMPT_FINISHED,
RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.APP_UPDATE_SAVED,
- RMAppEventType.KILL))
+ RMAppEventType.KILL, RMAppEventType.MOVE))
// Transitions from FINISHED state
// ignorable transitions
@@ -286,7 +288,7 @@ public class RMAppImpl implements RMApp,
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
- RMAppEventType.KILL))
+ RMAppEventType.KILL, RMAppEventType.MOVE))
// Transitions from FAILED state
// ignorable transitions
@@ -294,7 +296,8 @@ public class RMAppImpl implements RMApp,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
- EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
+ EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+ RMAppEventType.MOVE))
// Transitions from KILLED state
// ignorable transitions
@@ -307,7 +310,7 @@ public class RMAppImpl implements RMApp,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
- RMAppEventType.NODE_UPDATE))
+ RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
.installTopology();
@@ -820,17 +823,6 @@ public class RMAppImpl implements RMApp,
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- if (event instanceof RMAppNewSavedEvent) {
- RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
- // For HA this exception needs to be handled by giving up
- // master status if we got fenced
- if (((RMAppNewSavedEvent) event).getStoredException() != null) {
- LOG.error(
- "Failed to store application: " + storeEvent.getApplicationId(),
- storeEvent.getStoredException());
- ExitUtil.terminate(1, storeEvent.getStoredException());
- }
- }
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user));
}
@@ -848,13 +840,6 @@ public class RMAppImpl implements RMApp,
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
- if (storeEvent.getUpdatedException() != null) {
- LOG.error("Failed to update the final state of application"
- + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
- ExitUtil.terminate(1, storeEvent.getUpdatedException());
- }
-
if (app.transitionTodo instanceof SingleArcTransition) {
((SingleArcTransition) app.transitionTodo).transition(app,
app.eventCausingFinalSaving);
@@ -1191,6 +1176,9 @@ public class RMAppImpl implements RMApp,
public static boolean isAppInFinalState(RMApp rmApp) {
RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
+ if (appState == null) {
+ appState = rmApp.getState();
+ }
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Aug 20 01:34:29 2014
@@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUti
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -79,11 +80,9 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -217,7 +216,13 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.KILL,
new FinalSavingTransition(new BaseFinalTransition(
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
-
+ .addTransition(RMAppAttemptState.SCHEDULED,
+ RMAppAttemptState.FINAL_SAVING,
+ RMAppAttemptEventType.CONTAINER_FINISHED,
+ new FinalSavingTransition(
+ new AMContainerCrashedBeforeRunningTransition(),
+ RMAppAttemptState.FAILED))
+
// Transitions from ALLOCATED_SAVING State
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
@@ -391,7 +396,6 @@ public class RMAppAttemptImpl implements
RMAppAttemptState.KILLED,
RMAppAttemptState.KILLED,
EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
- RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.LAUNCH_FAILED,
RMAppAttemptEventType.EXPIRE,
@@ -553,7 +557,22 @@ public class RMAppAttemptImpl implements
@Override
public Token<AMRMTokenIdentifier> getAMRMToken() {
- return this.amrmToken;
+ this.readLock.lock();
+ try {
+ return this.amrmToken;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Private
+ public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) {
+ this.writeLock.lock();
+ try {
+ this.amrmToken = lastToken;
+ } finally {
+ this.writeLock.unlock();
+ }
}
@Override
@@ -707,7 +726,8 @@ public class RMAppAttemptImpl implements
this.attemptMetrics.setIsPreempted();
}
setMasterContainer(attemptState.getMasterContainer());
- recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
+ recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
+ attemptState.getState());
this.recoveredFinalState = attemptState.getState();
this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -719,9 +739,11 @@ public class RMAppAttemptImpl implements
this.justFinishedContainers = attempt.getJustFinishedContainers();
}
- private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
- throws IOException {
- if (appAttemptTokens == null) {
+ private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
+ RMAppAttemptState state) throws IOException {
+ if (appAttemptTokens == null || state == RMAppAttemptState.FAILED
+ || state == RMAppAttemptState.FINISHED
+ || state == RMAppAttemptState.KILLED) {
return;
}
@@ -732,12 +754,9 @@ public class RMAppAttemptImpl implements
.registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
}
- // Only one AMRMToken is stored per-attempt, so this should be fine. Can't
- // use TokenSelector as service may change - think fail-over.
this.amrmToken =
- (Token<AMRMTokenIdentifier>) appAttemptTokens
- .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
- rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
+ rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ applicationAttemptId);
}
private static class BaseTransition implements
@@ -773,11 +792,6 @@ public class RMAppAttemptImpl implements
.createMasterKey(appAttempt.applicationAttemptId);
}
- // create AMRMToken
- appAttempt.amrmToken =
- appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
- appAttempt.applicationAttemptId);
-
// Add the applicationAttempt to the scheduler and inform the scheduler
// whether to transfer the state from previous attempt.
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
@@ -889,7 +903,6 @@ public class RMAppAttemptImpl implements
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- appAttempt.checkAttemptStoreError(event);
appAttempt.launchAttempt();
}
}
@@ -1041,14 +1054,6 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
- if (storeEvent.getUpdatedException() != null) {
- LOG.error("Failed to update the final state of application attempt: "
- + storeEvent.getApplicationAttemptId(),
- storeEvent.getUpdatedException());
- ExitUtil.terminate(1, storeEvent.getUpdatedException());
- }
-
RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
if (appAttempt.transitionTodo instanceof SingleArcTransition) {
@@ -1178,12 +1183,11 @@ public class RMAppAttemptImpl implements
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- appAttempt.checkAttemptStoreError(event);
- // TODO Today unmanaged AM client is waiting for app state to be Accepted to
- // launch the AM. This is broken since we changed to start the attempt
- // after the application is Accepted. We may need to introduce an attempt
- // report that client can rely on to query the attempt state and choose to
- // launch the unmanaged AM.
+ // create AMRMToken
+ appAttempt.amrmToken =
+ appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+ appAttempt.applicationAttemptId);
+
super.transition(appAttempt, event);
}
}
@@ -1671,18 +1675,6 @@ public class RMAppAttemptImpl implements
rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
}
- private void checkAttemptStoreError(RMAppAttemptEvent event) {
- RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
- if(storeEvent.getStoredException() != null)
- {
- // This needs to be handled for HA and give up master status if we got
- // fenced
- LOG.error("Failed to store attempt: " + getAppAttemptId(),
- storeEvent.getStoredException());
- ExitUtil.terminate(1, storeEvent.getStoredException());
- }
- }
-
private void storeAttempt() {
// store attempt data in a non-blocking manner to prevent dispatcher
// thread starvation and wait for state to be saved
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Aug 20 01:34:29 2014
@@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNod
private final RMContext context;
private final String hostName;
private final int commandPort;
- private final int httpPort;
+ private int httpPort;
private final String nodeAddress; // The containerManager address
- private final String httpAddress;
+ private String httpAddress;
private volatile ResourceOption resourceOption;
private final Node node;
@@ -456,6 +456,24 @@ public class RMNodeImpl implements RMNod
}
}
+ private static void handleRunningAppOnNode(RMNodeImpl rmNode,
+ RMContext context, ApplicationId appId, NodeId nodeId) {
+ RMApp app = context.getRMApps().get(appId);
+
+ // if we failed getting app by appId, maybe something wrong happened, just
+ // add the app to the finishedApplications list so that the app can be
+ // cleaned up on the NM
+ if (null == app) {
+ LOG.warn("Cannot get RMApp by appId=" + appId
+ + ", just added it to finishedApplications list for cleanup");
+ rmNode.finishedApplications.add(appId);
+ return;
+ }
+
+ context.getDispatcher().getEventHandler()
+ .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
+ }
+
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@@ -496,24 +514,6 @@ public class RMNodeImpl implements RMNod
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
}
-
- void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context,
- ApplicationId appId, NodeId nodeId) {
- RMApp app = context.getRMApps().get(appId);
-
- // if we failed getting app by appId, maybe something wrong happened, just
- // add the app to the finishedApplications list so that the app can be
- // cleaned up on the NM
- if (null == app) {
- LOG.warn("Cannot get RMApp by appId=" + appId
- + ", just added it to finishedApplications list for cleanup");
- rmNode.finishedApplications.add(appId);
- return;
- }
-
- context.getDispatcher().getEventHandler()
- .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
- }
}
public static class ReconnectNodeTransition implements
@@ -521,36 +521,22 @@ public class RMNodeImpl implements RMNod
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
- // Kill containers since node is rejoining.
- rmNode.nodeUpdateQueue.clear();
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeRemovedSchedulerEvent(rmNode));
-
- RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
+ RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
+ RMNode newNode = reconnectEvent.getReconnectedNode();
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
- if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
- && rmNode.getHttpPort() == newNode.getHttpPort()) {
- // Reset heartbeat ID since node just restarted.
- rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
- if (rmNode.getState() != NodeState.UNHEALTHY) {
- // Only add new node if old state is not UNHEALTHY
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeAddedSchedulerEvent(rmNode));
- }
- } else {
- // Reconnected node differs, so replace old node and start new node
- switch (rmNode.getState()) {
- case RUNNING:
- ClusterMetrics.getMetrics().decrNumActiveNodes();
- break;
- case UNHEALTHY:
- ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
- break;
+ rmNode.httpPort = newNode.getHttpPort();
+ rmNode.httpAddress = newNode.getHttpAddress();
+ rmNode.resourceOption = newNode.getResourceOption();
+
+ // Reset heartbeat ID since node just restarted.
+ rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+
+ if (null != reconnectEvent.getRunningApplications()) {
+ for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
+ handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
- rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
- rmNode.context.getDispatcher().getEventHandler().handle(
- new RMNodeStartedEvent(newNode.getNodeID(), null, null));
}
+
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java Wed Aug 20 01:34:29 2014
@@ -18,17 +18,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
public class RMNodeReconnectEvent extends RMNodeEvent {
private RMNode reconnectedNode;
+ private List<ApplicationId> runningApplications;
- public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+ public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
+ List<ApplicationId> runningApps) {
super(nodeId, RMNodeEventType.RECONNECTED);
reconnectedNode = newNode;
+ runningApplications = runningApps;
}
public RMNode getReconnectedNode() {
return reconnectedNode;
}
+
+ public List<ApplicationId> getRunningApplications() {
+ return runningApplications;
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Wed Aug 20 01:34:29 2014
@@ -18,14 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -33,21 +38,34 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.util.concurrent.SettableFuture;
+
+
@SuppressWarnings("unchecked")
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
@@ -66,6 +84,7 @@ public abstract class AbstractYarnSchedu
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication<T>> applications;
+ protected int nmExpireInterval;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@@ -81,6 +100,15 @@ public abstract class AbstractYarnSchedu
super(name);
}
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ nmExpireInterval =
+ conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ createReleaseCache();
+ super.serviceInit(conf);
+ }
+
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
@@ -275,6 +303,19 @@ public abstract class AbstractYarnSchedu
((RMContainerImpl)rmContainer).setAMContainer(true);
}
}
+
+ synchronized (schedulerAttempt) {
+ Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
+ if (releases.contains(container.getContainerId())) {
+ // release the container
+ rmContainer.handle(new RMContainerFinishedEvent(container
+ .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
+ container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
+ RMContainerEventType.RELEASED));
+ releases.remove(container.getContainerId());
+ LOG.info(container.getContainerId() + " is released by application.");
+ }
+ }
}
}
@@ -314,7 +355,109 @@ public abstract class AbstractYarnSchedu
}
}
+ protected void createReleaseCache() {
+ // Cleanup the cache after nm expire interval.
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ for (SchedulerApplication<T> app : applications.values()) {
+
+ T attempt = app.getCurrentAppAttempt();
+ synchronized (attempt) {
+ for (ContainerId containerId : attempt.getPendingRelease()) {
+ RMAuditLogger.logFailure(
+ app.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container",
+ "Scheduler",
+ "Trying to release container not owned by app or with invalid id.",
+ attempt.getApplicationId(), containerId);
+ }
+ attempt.getPendingRelease().clear();
+ }
+ }
+ LOG.info("Release request cache is cleaned up");
+ }
+ }, nmExpireInterval);
+ }
+
+ // clean up a completed container
+ protected abstract void completedContainer(RMContainer rmContainer,
+ ContainerStatus containerStatus, RMContainerEventType event);
+
+ protected void releaseContainers(List<ContainerId> containers,
+ SchedulerApplicationAttempt attempt) {
+ for (ContainerId containerId : containers) {
+ RMContainer rmContainer = getRMContainer(containerId);
+ if (rmContainer == null) {
+ if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+ < nmExpireInterval) {
+ LOG.info(containerId + " doesn't exist. Add the container"
+ + " to the release request cache as it maybe on recovery.");
+ synchronized (attempt) {
+ attempt.getPendingRelease().add(containerId);
+ }
+ } else {
+ RMAuditLogger.logFailure(attempt.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container", "Scheduler",
+ "Trying to release container not owned by app or with invalid id.",
+ attempt.getApplicationId(), containerId);
+ }
+ }
+ completedContainer(rmContainer,
+ SchedulerUtils.createAbnormalContainerStatus(containerId,
+ SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
+ }
+ }
+
public SchedulerNode getSchedulerNode(NodeId nodeId) {
return nodes.get(nodeId);
}
+
+ @Override
+ public synchronized void moveAllApps(String sourceQueue, String destQueue)
+ throws YarnException {
+ // check if destination queue is a valid leaf queue
+ try {
+ getQueueInfo(destQueue, false, false);
+ } catch (IOException e) {
+ LOG.warn(e);
+ throw new YarnException(e);
+ }
+ // check if source queue is a valid
+ List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
+ if (apps == null) {
+ String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
+ LOG.warn(errMsg);
+ throw new YarnException(errMsg);
+ }
+ // generate move events for each pending/running app
+ for (ApplicationAttemptId app : apps) {
+ SettableFuture<Object> future = SettableFuture.create();
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
+ }
+ }
+
+ @Override
+ public synchronized void killAllAppsInQueue(String queueName)
+ throws YarnException {
+ // check if queue is a valid
+ List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
+ if (apps == null) {
+ String errMsg = "The specified Queue: " + queueName + " doesn't exist";
+ LOG.warn(errMsg);
+ throw new YarnException(errMsg);
+ }
+ // generate kill events for each pending/running app
+ for (ApplicationAttemptId app : apps) {
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
+ }
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Aug 20 01:34:29 2014
@@ -54,7 +54,7 @@ public class AppSchedulingInfo {
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
private final ApplicationAttemptId applicationAttemptId;
final ApplicationId applicationId;
- private final String queueName;
+ private String queueName;
Queue queue;
final String user;
// TODO making containerIdCounter long
@@ -360,7 +360,7 @@ public class AppSchedulingInfo {
List<ResourceRequest> resourceRequests) {
// Update future requirements
decrementOutstanding(offSwitchRequest);
- // Update cloned RackLocal and OffRack requests for recovery
+ // Update cloned OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
@@ -410,6 +410,7 @@ public class AppSchedulingInfo {
activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
+ this.queueName = newQueue.getQueueName();
}
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Wed Aug 20 01:34:29 2014
@@ -17,13 +17,14 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
@@ -87,6 +88,13 @@ public class SchedulerApplicationAttempt
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
+ // This pendingRelease is used in work-preserving recovery scenario to keep
+ // track of the AM's outstanding release requests. RM on recovery could
+ // receive the release request form AM before it receives the container status
+ // from NM for recovery. In this case, the to-be-recovered containers reported
+ // by NM should not be recovered.
+ private Set<ContainerId> pendingRelease = null;
+
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
@@ -114,7 +122,7 @@ public class SchedulerApplicationAttempt
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, rmContext.getEpoch());
this.queue = queue;
-
+ this.pendingRelease = new HashSet<ContainerId>();
if (rmContext.getRMApps() != null &&
rmContext.getRMApps()
.containsKey(applicationAttemptId.getApplicationId())) {
@@ -163,6 +171,10 @@ public class SchedulerApplicationAttempt
return appSchedulingInfo.getResourceRequests(priority);
}
+ public Set<ContainerId> getPendingRelease() {
+ return this.pendingRelease;
+ }
+
public int getNewContainerId() {
return appSchedulingInfo.getNewContainerId();
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Wed Aug 20 01:34:29 2014
@@ -153,14 +153,17 @@ public class SchedulerUtils {
* @param rmNode RMNode with new resource view
* @param clusterResource the cluster's resource that need to update
* @param log Scheduler's log for resource change
+ * @return true if the resources have changed
*/
- public static void updateResourceIfChanged(SchedulerNode node,
+ public static boolean updateResourceIfChanged(SchedulerNode node,
RMNode rmNode, Resource clusterResource, Log log) {
+ boolean result = false;
Resource oldAvailableResource = node.getAvailableResource();
Resource newAvailableResource = Resources.subtract(
rmNode.getTotalCapability(), node.getUsedResource());
if (!newAvailableResource.equals(oldAvailableResource)) {
+ result = true;
Resource deltaResource = Resources.subtract(newAvailableResource,
oldAvailableResource);
// Reflect resource change to scheduler node.
@@ -176,6 +179,8 @@ public class SchedulerUtils {
+ " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
+ deltaResource.getMemory() +"MB");
}
+
+ return result;
}
/**
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Wed Aug 20 01:34:29 2014
@@ -202,4 +202,22 @@ public interface YarnScheduler extends E
@Evolving
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException;
+
+ /**
+ * Completely drain sourceQueue of applications, by moving all of them to
+ * destQueue.
+ *
+ * @param sourceQueue
+ * @param destQueue
+ * @throws YarnException
+ */
+ void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
+
+ /**
+ * Terminate all applications in the specified queue.
+ *
+ * @param queueName the name of queue to be drained
+ * @throws YarnException
+ */
+ void killAllAppsInQueue(String queueName) throws YarnException;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Wed Aug 20 01:34:29 2014
@@ -238,4 +238,22 @@ extends org.apache.hadoop.yarn.server.re
* @param apps the collection to add the applications to
*/
public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps);
+
+ /**
+ * Detach a container from this queue
+ * @param clusterResource the current cluster resource
+ * @param application application to which the container was assigned
+ * @param container the container to detach
+ */
+ public void detachContainer(Resource clusterResource,
+ FiCaSchedulerApp application, RMContainer container);
+
+ /**
+ * Attach a container to this queue
+ * @param clusterResource the current cluster resource
+ * @param application application to which the container was assigned
+ * @param container the container to attach
+ */
+ public void attachContainer(Resource clusterResource,
+ FiCaSchedulerApp application, RMContainer container);
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Wed Aug 20 01:34:29 2014
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -24,6 +27,8 @@ import org.apache.hadoop.yarn.util.resou
class CSQueueUtils {
+ private static final Log LOG = LogFactory.getLog(CSQueueUtils.class);
+
final static float EPSILON = 0.0001f;
public static void checkMaxCapacity(String queueName,
@@ -113,4 +118,52 @@ class CSQueueUtils {
)
);
}
+
+ public static float getAbsoluteMaxAvailCapacity(
+ ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) {
+ CSQueue parent = queue.getParent();
+ if (parent == null) {
+ return queue.getAbsoluteMaximumCapacity();
+ }
+
+ //Get my parent's max avail, needed to determine my own
+ float parentMaxAvail = getAbsoluteMaxAvailCapacity(
+ resourceCalculator, clusterResource, parent);
+ //...and as a resource
+ Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail);
+
+ //check for no resources parent before dividing, if so, max avail is none
+ if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) {
+ return 0.0f;
+ }
+ //sibling used is parent used - my used...
+ float siblingUsedCapacity = Resources.ratio(
+ resourceCalculator,
+ Resources.subtract(parent.getUsedResources(), queue.getUsedResources()),
+ parentResource);
+ //my max avail is the lesser of my max capacity and what is unused from my parent
+ //by my siblings (if they are beyond their base capacity)
+ float maxAvail = Math.min(
+ queue.getMaximumCapacity(),
+ 1.0f - siblingUsedCapacity);
+ //and, mutiply by parent to get absolute (cluster relative) value
+ float absoluteMaxAvail = maxAvail * parentMaxAvail;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("qpath " + queue.getQueuePath());
+ LOG.debug("parentMaxAvail " + parentMaxAvail);
+ LOG.debug("siblingUsedCapacity " + siblingUsedCapacity);
+ LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity());
+ LOG.debug("maxAvail " + maxAvail);
+ LOG.debug("absoluteMaxAvail " + absoluteMaxAvail);
+ }
+
+ if (absoluteMaxAvail < 0.0f) {
+ absoluteMaxAvail = 0.0f;
+ } else if (absoluteMaxAvail > 1.0f) {
+ absoluteMaxAvail = 1.0f;
+ }
+
+ return absoluteMaxAvail;
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Aug 20 01:34:29 2014
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import com.google.common.base.Preconditions;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -41,6 +39,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -53,15 +52,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -76,6 +71,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -93,6 +90,7 @@ import org.apache.hadoop.yarn.util.resou
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
@LimitedPrivate("yarn")
@Evolving
@@ -198,6 +196,16 @@ public class CapacityScheduler extends
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
+ private boolean overrideWithQueueMappings = false;
+ private List<QueueMapping> mappings = null;
+ private Groups groups;
+
+ @VisibleForTesting
+ public synchronized String getMappedQueueForTest(String user)
+ throws IOException {
+ return getMappedQueue(user);
+ }
+
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
}
@@ -262,7 +270,6 @@ public class CapacityScheduler extends
this.applications =
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
-
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -401,7 +408,32 @@ public class CapacityScheduler extends
}
}
private static final QueueHook noop = new QueueHook();
-
+
+ private void initializeQueueMappings() throws IOException {
+ overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ LOG.info("Initialized queue mappings, override: "
+ + overrideWithQueueMappings);
+ // Get new user/group mappings
+ List<QueueMapping> newMappings = conf.getQueueMappings();
+ //check if mappings refer to valid queues
+ for (QueueMapping mapping : newMappings) {
+ if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
+ !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ CSQueue queue = queues.get(mapping.queue);
+ if (queue == null || !(queue instanceof LeafQueue)) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mapping.queue);
+ }
+ }
+ }
+ //apply the new mappings since they are valid
+ mappings = newMappings;
+ // initialize groups if mappings are present
+ if (mappings.size() > 0) {
+ groups = new Groups(conf);
+ }
+ }
+
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
@@ -409,7 +441,9 @@ public class CapacityScheduler extends
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
+
LOG.info("Initialized root queue " + root);
+ initializeQueueMappings();
}
@Lock(CapacityScheduler.class)
@@ -429,6 +463,7 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
+ initializeQueueMappings();
}
/**
@@ -516,12 +551,73 @@ public class CapacityScheduler extends
}
synchronized CSQueue getQueue(String queueName) {
+ if (queueName == null) {
+ return null;
+ }
return queues.get(queueName);
}
+ private static final String CURRENT_USER_MAPPING = "%user";
+
+ private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+ private String getMappedQueue(String user) throws IOException {
+ for (QueueMapping mapping : mappings) {
+ if (mapping.type == MappingType.USER) {
+ if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+ if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+ return user;
+ }
+ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ return groups.getGroups(user).get(0);
+ }
+ else {
+ return mapping.queue;
+ }
+ }
+ if (user.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ if (mapping.type == MappingType.GROUP) {
+ for (String userGroups : groups.getGroups(user)) {
+ if (userGroups.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
private synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user, boolean isAppRecovering) {
- // santiy checks.
+ String queueName, String user, boolean isAppRecovering) {
+
+ if (mappings != null && mappings.size() > 0) {
+ try {
+ String mappedQueue = getMappedQueue(user);
+ if (mappedQueue != null) {
+ // We have a mapping, should we use it?
+ if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+ || overrideWithQueueMappings) {
+ LOG.info("Application " + applicationId + " user " + user
+ + " mapping [" + queueName + "] to [" + mappedQueue
+ + "] override " + overrideWithQueueMappings);
+ queueName = mappedQueue;
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ rmApp.setQueue(queueName);
+ }
+ }
+ } catch (IOException ioex) {
+ String message = "Failed to submit application " + applicationId +
+ " submitted by user " + user + " reason: " + ioex.getMessage();
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
+ return;
+ }
+ }
+
+ // sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
String message = "Application " + applicationId +
@@ -547,6 +643,8 @@ public class CapacityScheduler extends
.handle(new RMAppRejectedEvent(applicationId, ace.toString()));
return;
}
+ // update the metrics
+ queue.getMetrics().submitApp(user);
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(queue, user);
applications.put(applicationId, application);
@@ -689,21 +787,7 @@ public class CapacityScheduler extends
getMinimumResourceCapability(), maximumAllocation);
// Release containers
- for (ContainerId releasedContainerId : release) {
- RMContainer rmContainer = getRMContainer(releasedContainerId);
- if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "CapacityScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainerId);
- }
- completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- releasedContainerId,
- SchedulerUtils.RELEASED_CONTAINER),
- RMContainerEventType.RELEASED);
- }
+ releaseContainers(release, application);
synchronized (application) {
@@ -783,7 +867,10 @@ public class CapacityScheduler extends
FiCaSchedulerNode node = getNode(nm.getNodeID());
// Update resource if any change
- SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
+ if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource,
+ LOG)) {
+ root.updateClusterResource(clusterResource);
+ }
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@@ -896,8 +983,8 @@ public class CapacityScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser(),
- appAddedEvent.getIsAppRecovering());
+ appAddedEvent.getQueue(),
+ appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
@@ -995,7 +1082,8 @@ public class CapacityScheduler extends
}
@Lock(CapacityScheduler.class)
- private synchronized void completedContainer(RMContainer rmContainer,
+ @Override
+ protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
@@ -1128,4 +1216,59 @@ public class CapacityScheduler extends
throw new IOException(e);
}
}
+
+ @Override
+ public synchronized String moveApplication(ApplicationId appId,
+ String targetQueueName) throws YarnException {
+ FiCaSchedulerApp app =
+ getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
+ String sourceQueueName = app.getQueue().getQueueName();
+ LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+ LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
+ // Validation check - ACLs, submission limits for user & queue
+ String user = app.getUser();
+ try {
+ dest.submitApplication(appId, user, targetQueueName);
+ } catch (AccessControlException e) {
+ throw new YarnException(e);
+ }
+ // Move all live containers
+ for (RMContainer rmContainer : app.getLiveContainers()) {
+ source.detachContainer(clusterResource, app, rmContainer);
+ // attach the Container to another queue
+ dest.attachContainer(clusterResource, app, rmContainer);
+ }
+ // Detach the application..
+ source.finishApplicationAttempt(app, sourceQueueName);
+ source.getParent().finishApplication(appId, app.getUser());
+ // Finish app & update metrics
+ app.move(dest);
+ // Submit to a new queue
+ dest.submitApplicationAttempt(app, user);
+ applications.get(appId).setQueue(dest);
+ LOG.info("App: " + app.getApplicationId() + " successfully moved from "
+ + sourceQueueName + " to: " + targetQueueName);
+ return targetQueueName;
+ }
+
+ /**
+ * Check that the String provided in input is the name of an existing,
+ * LeafQueue, if successful returns the queue.
+ *
+ * @param queue
+ * @return the LeafQueue
+ * @throws YarnException
+ */
+ private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+ CSQueue ret = this.getQueue(queue);
+ if (ret == null) {
+ throw new YarnException("The specified Queue: " + queue
+ + " doesn't exist");
+ }
+ if (!(ret instanceof LeafQueue)) {
+ throw new YarnException("The specified Queue: " + queue
+ + " is not a Leaf Queue. Move is supported only for Leaf Queues.");
+ }
+ return (LeafQueue) ret;
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Wed Aug 20 01:34:29 2014
@@ -18,8 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -145,6 +144,44 @@ public class CapacitySchedulerConfigurat
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+
+ @Private
+ public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
+
+ @Private
+ public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
+
+ @Private
+ public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
+
+ @Private
+ public static class QueueMapping {
+
+ public enum MappingType {
+
+ USER("u"),
+ GROUP("g");
+ private final String type;
+ private MappingType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+
+ };
+
+ MappingType type;
+ String source;
+ String queue;
+
+ public QueueMapping(MappingType type, String source, String queue) {
+ this.type = type;
+ this.source = source;
+ this.queue = queue;
+ }
+ }
public CapacitySchedulerConfiguration() {
this(new Configuration());
@@ -378,4 +415,82 @@ public class CapacitySchedulerConfigurat
setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
}
+ public boolean getOverrideWithQueueMappings() {
+ return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
+ DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
+ }
+
+ /**
+ * Returns a collection of strings, trimming leading and trailing whitespeace
+ * on each value
+ *
+ * @param str
+ * String to parse
+ * @param delim
+ * delimiter to separate the values
+ * @return Collection of parsed elements.
+ */
+ private static Collection<String> getTrimmedStringCollection(String str,
+ String delim) {
+ List<String> values = new ArrayList<String>();
+ if (str == null)
+ return values;
+ StringTokenizer tokenizer = new StringTokenizer(str, delim);
+ while (tokenizer.hasMoreTokens()) {
+ String next = tokenizer.nextToken();
+ if (next == null || next.trim().isEmpty()) {
+ continue;
+ }
+ values.add(next.trim());
+ }
+ return values;
+ }
+
+ /**
+ * Get user/group mappings to queues.
+ *
+ * @return user/groups mappings or null on illegal configs
+ */
+ public List<QueueMapping> getQueueMappings() {
+ List<QueueMapping> mappings =
+ new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
+ Collection<String> mappingsString =
+ getTrimmedStringCollection(QUEUE_MAPPING);
+ for (String mappingValue : mappingsString) {
+ String[] mapping =
+ getTrimmedStringCollection(mappingValue, ":")
+ .toArray(new String[] {});
+ if (mapping.length != 3 || mapping[1].length() == 0
+ || mapping[2].length() == 0) {
+ throw new IllegalArgumentException(
+ "Illegal queue mapping " + mappingValue);
+ }
+
+ QueueMapping m;
+ try {
+ QueueMapping.MappingType mappingType;
+ if (mapping[0].equals("u")) {
+ mappingType = QueueMapping.MappingType.USER;
+ } else if (mapping[0].equals("g")) {
+ mappingType = QueueMapping.MappingType.GROUP;
+ } else {
+ throw new IllegalArgumentException(
+ "unknown mapping prefix " + mapping[0]);
+ }
+ m = new QueueMapping(
+ mappingType,
+ mapping[1],
+ mapping[2]);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException(
+ "Illegal queue mapping " + mappingValue);
+ }
+
+ if (m != null) {
+ mappings.add(m);
+ }
+ }
+
+ return mappings;
+ }
}