You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:59 UTC

svn commit: r1619019 [6/10] - in /hadoop/common/branches/YARN-1051/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/had...

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Wed Aug 20 01:34:29 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -45,24 +44,21 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
@@ -85,6 +81,8 @@ public abstract class RMStateStore exten
   protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
   protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
       "RMDTSequenceNumber_";
+  protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
+      "AMRMTokenSecretManagerRoot";
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String EPOCH_NODE = "EpochNode";
 
@@ -134,7 +132,8 @@ public abstract class RMStateStore exten
       LOG.info("Storing info for app: " + appId);
       try {
         store.storeApplicationStateInternal(appId, appStateData);
-        store.notifyDoneStoringApplication(appId, null);
+        store.notifyApplication(new RMAppEvent(appId,
+               RMAppEventType.APP_NEW_SAVED));
       } catch (Exception e) {
         LOG.error("Error storing app: " + appId, e);
         store.notifyStoreOperationFailed(e);
@@ -158,7 +157,8 @@ public abstract class RMStateStore exten
       LOG.info("Updating info for app: " + appId);
       try {
         store.updateApplicationStateInternal(appId, appStateData);
-        store.notifyDoneUpdatingApplication(appId, null);
+        store.notifyApplication(new RMAppEvent(appId,
+               RMAppEventType.APP_UPDATE_SAVED));
       } catch (Exception e) {
         LOG.error("Error updating app: " + appId, e);
         store.notifyStoreOperationFailed(e);
@@ -207,8 +207,9 @@ public abstract class RMStateStore exten
         }
         store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
             attemptStateData);
-        store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
-            null);
+        store.notifyApplicationAttempt(new RMAppAttemptEvent
+               (attemptState.getAttemptId(),
+               RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
       } catch (Exception e) {
         LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
         store.notifyStoreOperationFailed(e);
@@ -235,8 +236,9 @@ public abstract class RMStateStore exten
         }
         store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
             attemptStateData);
-        store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
-            null);
+        store.notifyApplicationAttempt(new RMAppAttemptEvent
+               (attemptState.getAttemptId(),
+               RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
       } catch (Exception e) {
         LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
         store.notifyStoreOperationFailed(e);
@@ -412,6 +414,8 @@ public abstract class RMStateStore exten
 
     RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
 
+    AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
+
     public Map<ApplicationId, ApplicationState> getApplicationState() {
       return appState;
     }
@@ -419,6 +423,10 @@ public abstract class RMStateStore exten
     public RMDTSecretManagerState getRMDTSecretManagerState() {
       return rmSecretManagerState;
     }
+
+    public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
+      return amrmTokenSecretManagerState;
+    }
   }
     
   private Dispatcher rmDispatcher;
@@ -487,14 +495,14 @@ public abstract class RMStateStore exten
    *    upgrade RM state.
    */
   public void checkVersion() throws Exception {
-    RMStateVersion loadedVersion = loadVersion();
+    Version loadedVersion = loadVersion();
     LOG.info("Loaded RM state version info " + loadedVersion);
     if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
       return;
     }
     // if there is no version info, treat it as 1.0;
     if (loadedVersion == null) {
-      loadedVersion = RMStateVersion.newInstance(1, 0);
+      loadedVersion = Version.newInstance(1, 0);
     }
     if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
       LOG.info("Storing RM state version info " + getCurrentVersion());
@@ -510,7 +518,7 @@ public abstract class RMStateStore exten
    * Derived class use this method to load the version information from state
    * store.
    */
-  protected abstract RMStateVersion loadVersion() throws Exception;
+  protected abstract Version loadVersion() throws Exception;
 
   /**
    * Derived class use this method to store the version information.
@@ -520,7 +528,7 @@ public abstract class RMStateStore exten
   /**
    * Get the current version of the underlying state store.
    */
-  protected abstract RMStateVersion getCurrentVersion();
+  protected abstract Version getCurrentVersion();
 
 
   /**
@@ -714,6 +722,14 @@ public abstract class RMStateStore exten
       throws Exception;
 
   /**
+   * Blocking API Derived classes must implement this method to store or update
+   * the state of AMRMToken Master Key
+   */
+  public abstract void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+      boolean isUpdate);
+
+  /**
    * Non-blocking API
    * ResourceManager services call this to remove an application from the state
    * store
@@ -755,10 +771,7 @@ public abstract class RMStateStore exten
   
   public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
     Credentials credentials = new Credentials();
-    Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
-    if(appToken != null){
-      credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
-    }
+
     SecretKey clientTokenMasterKey =
         appAttempt.getClientTokenMasterKey();
     if(clientTokenMasterKey != null){
@@ -792,47 +805,28 @@ public abstract class RMStateStore exten
     }
     rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
   }
-
+ 
   @SuppressWarnings("unchecked")
   /**
-   * In (@link handleStoreEvent}, this method is called to notify the
-   * application that new application is stored in state store
-   * @param appId id of the application that has been saved
-   * @param storedException the exception that is thrown when storing the
-   * application
-   */
-  private void notifyDoneStoringApplication(ApplicationId appId,
-                                                  Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-        new RMAppNewSavedEvent(appId, storedException));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void notifyDoneUpdatingApplication(ApplicationId appId,
-      Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppUpdateSavedEvent(appId, storedException));
+   * This method is called to notify the application that
+   * new application is stored or updated in state store
+   * @param event App event containing the app id and event type
+   */
+  private void notifyApplication(RMAppEvent event) {
+    rmDispatcher.getEventHandler().handle(event);
   }
-
+  
   @SuppressWarnings("unchecked")
   /**
-   * In (@link handleStoreEvent}, this method is called to notify the
-   * application attempt that new attempt is stored in state store
-   * @param appAttempt attempt that has been saved
-   */
-  private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
-                                                  Exception storedException) {
-    rmDispatcher.getEventHandler().handle(
-        new RMAppAttemptNewSavedEvent(attemptId, storedException));
-  }
-
-  @SuppressWarnings("unchecked")
-  private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId,
-      Exception updatedException) {
-    rmDispatcher.getEventHandler().handle(
-      new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
+   * This method is called to notify the application attempt
+   * that new attempt is stored or updated in state store
+   * @param event App attempt event containing the app attempt
+   * id and event type
+   */
+  private void notifyApplicationAttempt(RMAppAttemptEvent event) {
+    rmDispatcher.getEventHandler().handle(event);
   }
-
+  
   /**
    * EventHandler implementation which forward events to the FSRMStateStore
    * This hides the EventHandle methods of the store from its public interface 

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Wed Aug 20 01:34:29 2014
@@ -44,22 +44,23 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
-
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -77,6 +78,11 @@ import org.apache.zookeeper.server.auth.
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
+ */
 @Private
 @Unstable
 public class ZKRMStateStore extends RMStateStore {
@@ -85,8 +91,8 @@ public class ZKRMStateStore extends RMSt
   private final SecureRandom random = new SecureRandom();
 
   protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
-  protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
-      .newInstance(1, 1);
+  protected static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(1, 2);
   private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
       "RMDelegationTokensRoot";
   private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -128,6 +134,9 @@ public class ZKRMStateStore extends RMSt
    *        |      |----- Key_1
    *        |      |----- Key_2
    *                ....
+   * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
+   *        |----- currentMasterKey
+   *        |----- nextMasterKey
    *
    */
   private String zkRootNodePath;
@@ -136,6 +145,7 @@ public class ZKRMStateStore extends RMSt
   private String dtMasterKeysRootPath;
   private String delegationTokensRootPath;
   private String dtSequenceNumberPath;
+  private String amrmTokenSecretManagerRoot;
 
   @VisibleForTesting
   protected String znodeWorkingPath;
@@ -255,6 +265,8 @@ public class ZKRMStateStore extends RMSt
         RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
     dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
         RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
+    amrmTokenSecretManagerRoot =
+        getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
   }
 
   @Override
@@ -275,6 +287,7 @@ public class ZKRMStateStore extends RMSt
     createRootDir(dtMasterKeysRootPath);
     createRootDir(delegationTokensRootPath);
     createRootDir(dtSequenceNumberPath);
+    createRootDir(amrmTokenSecretManagerRoot);
   }
 
   private void createRootDir(final String rootPath) throws Exception {
@@ -369,7 +382,7 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  protected RMStateVersion getCurrentVersion() {
+  protected Version getCurrentVersion() {
     return CURRENT_VERSION_INFO;
   }
 
@@ -377,7 +390,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized void storeVersion() throws Exception {
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
     byte[] data =
-        ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+        ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
     if (existsWithRetries(versionNodePath, true) != null) {
       setDataWithRetries(versionNodePath, data, -1);
     } else {
@@ -386,13 +399,13 @@ public class ZKRMStateStore extends RMSt
   }
 
   @Override
-  protected synchronized RMStateVersion loadVersion() throws Exception {
+  protected synchronized Version loadVersion() throws Exception {
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
 
     if (existsWithRetries(versionNodePath, true) != null) {
       byte[] data = getDataWithRetries(versionNodePath, true);
-      RMStateVersion version =
-          new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+      Version version =
+          new VersionPBImpl(VersionProto.parseFrom(data));
       return version;
     }
     return null;
@@ -427,9 +440,27 @@ public class ZKRMStateStore extends RMSt
     loadRMDTSecretManagerState(rmState);
     // recover RM applications
     loadRMAppState(rmState);
+    // recover AMRMTokenSecretManager
+    loadAMRMTokenSecretManagerState(rmState);
     return rmState;
   }
 
+  private void loadAMRMTokenSecretManagerState(RMState rmState)
+      throws Exception {
+    byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true);
+    if (data == null) {
+      LOG.warn("There is no data saved");
+      return;
+    }
+    AMRMTokenSecretManagerStatePBImpl stateData =
+        new AMRMTokenSecretManagerStatePBImpl(
+          AMRMTokenSecretManagerStateProto.parseFrom(data));
+    rmState.amrmTokenSecretManagerState =
+        AMRMTokenSecretManagerState.newInstance(
+          stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
+
+  }
+
   private synchronized void loadRMDTSecretManagerState(RMState rmState)
       throws Exception {
     loadRMDelegationKeyState(rmState);
@@ -1112,4 +1143,19 @@ public class ZKRMStateStore extends RMSt
     return zk;
   }
 
+  @Override
+  public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+      boolean isUpdate) {
+    AMRMTokenSecretManagerState data =
+        AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+    byte[] stateData = data.getProto().toByteArray();
+    try {
+      setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
+    } catch (Exception ex) {
+      LOG.info("Error storing info for AMRMTokenSecretManager", ex);
+      notifyStoreOperationFailed(ex);
+    }
+  }
+
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Aug 20 01:34:29 2014
@@ -166,6 +166,8 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.APP_REJECTED,
           new FinalSavingTransition(new AppRejectedTransition(),
             RMAppState.FAILED))
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+        RMAppEventType.MOVE, new RMAppMoveTransition())
 
      // Transitions from SUBMITTED state
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
@@ -243,7 +245,7 @@ public class RMAppImpl implements RMApp,
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
-          RMAppEventType.APP_NEW_SAVED))
+          RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
 
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
@@ -254,9 +256,9 @@ public class RMAppImpl implements RMApp,
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
-        // ignore Kill as we have already saved the final Finished state in
-        // state store.
-        RMAppEventType.KILL))
+        // ignore Kill/Move as we have already saved the final Finished state
+        // in state store.
+        RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from KILLING state
     .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
@@ -274,7 +276,7 @@ public class RMAppImpl implements RMApp,
             RMAppEventType.ATTEMPT_FINISHED,
             RMAppEventType.ATTEMPT_FAILED,
             RMAppEventType.APP_UPDATE_SAVED,
-            RMAppEventType.KILL))
+            RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from FINISHED state
      // ignorable transitions
@@ -286,7 +288,7 @@ public class RMAppImpl implements RMApp,
             RMAppEventType.NODE_UPDATE,
             RMAppEventType.ATTEMPT_UNREGISTERED,
             RMAppEventType.ATTEMPT_FINISHED,
-            RMAppEventType.KILL))
+            RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from FAILED state
      // ignorable transitions
@@ -294,7 +296,8 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
     .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+            RMAppEventType.MOVE))
 
      // Transitions from KILLED state
      // ignorable transitions
@@ -307,7 +310,7 @@ public class RMAppImpl implements RMApp,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.NODE_UPDATE))
+            RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
 
      .installTopology();
 
@@ -820,17 +823,6 @@ public class RMAppImpl implements RMApp,
       RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      if (event instanceof RMAppNewSavedEvent) {
-        RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
-        // For HA this exception needs to be handled by giving up
-        // master status if we got fenced
-        if (((RMAppNewSavedEvent) event).getStoredException() != null) {
-          LOG.error(
-            "Failed to store application: " + storeEvent.getApplicationId(),
-            storeEvent.getStoredException());
-          ExitUtil.terminate(1, storeEvent.getStoredException());
-        }
-      }
       app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
         app.submissionContext.getQueue(), app.user));
     }
@@ -848,13 +840,6 @@ public class RMAppImpl implements RMApp,
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-      RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
-      if (storeEvent.getUpdatedException() != null) {
-        LOG.error("Failed to update the final state of application"
-              + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
-        ExitUtil.terminate(1, storeEvent.getUpdatedException());
-      }
-
       if (app.transitionTodo instanceof SingleArcTransition) {
         ((SingleArcTransition) app.transitionTodo).transition(app,
           app.eventCausingFinalSaving);
@@ -1191,6 +1176,9 @@ public class RMAppImpl implements RMApp,
   
   public static boolean isAppInFinalState(RMApp rmApp) {
     RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
+    if (appState == null) {
+      appState = rmApp.getState();
+    }
     return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
         || appState == RMAppState.KILLED;
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Aug 20 01:34:29 2014
@@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUti
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -79,11 +80,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -217,7 +216,13 @@ public class RMAppAttemptImpl implements
           RMAppAttemptEventType.KILL,
           new FinalSavingTransition(new BaseFinalTransition(
             RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
-          
+      .addTransition(RMAppAttemptState.SCHEDULED,
+          RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new FinalSavingTransition(
+            new AMContainerCrashedBeforeRunningTransition(),
+            RMAppAttemptState.FAILED))
+
        // Transitions from ALLOCATED_SAVING State
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
           RMAppAttemptState.ALLOCATED,
@@ -391,7 +396,6 @@ public class RMAppAttemptImpl implements
           RMAppAttemptState.KILLED,
           RMAppAttemptState.KILLED,
           EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
-              RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.LAUNCHED,
               RMAppAttemptEventType.LAUNCH_FAILED,
               RMAppAttemptEventType.EXPIRE,
@@ -553,7 +557,22 @@ public class RMAppAttemptImpl implements
 
   @Override
   public Token<AMRMTokenIdentifier> getAMRMToken() {
-    return this.amrmToken;
+    this.readLock.lock();
+    try {
+      return this.amrmToken;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) {
+    this.writeLock.lock();
+    try {
+      this.amrmToken = lastToken;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Override
@@ -707,7 +726,8 @@ public class RMAppAttemptImpl implements
       this.attemptMetrics.setIsPreempted();
     }
     setMasterContainer(attemptState.getMasterContainer());
-    recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
+    recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
+      attemptState.getState());
     this.recoveredFinalState = attemptState.getState();
     this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
     this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -719,9 +739,11 @@ public class RMAppAttemptImpl implements
     this.justFinishedContainers = attempt.getJustFinishedContainers();
   }
 
-  private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
-      throws IOException {
-    if (appAttemptTokens == null) {
+  private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
+      RMAppAttemptState state) throws IOException {
+    if (appAttemptTokens == null || state == RMAppAttemptState.FAILED
+        || state == RMAppAttemptState.FINISHED
+        || state == RMAppAttemptState.KILLED) {
       return;
     }
 
@@ -732,12 +754,9 @@ public class RMAppAttemptImpl implements
           .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
     }
 
-    // Only one AMRMToken is stored per-attempt, so this should be fine. Can't
-    // use TokenSelector as service may change - think fail-over.
     this.amrmToken =
-        (Token<AMRMTokenIdentifier>) appAttemptTokens
-          .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
-    rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
+        rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+          applicationAttemptId);
   }
 
   private static class BaseTransition implements
@@ -773,11 +792,6 @@ public class RMAppAttemptImpl implements
               .createMasterKey(appAttempt.applicationAttemptId);
       }
 
-      // create AMRMToken
-      appAttempt.amrmToken =
-          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
-            appAttempt.applicationAttemptId);
-
       // Add the applicationAttempt to the scheduler and inform the scheduler
       // whether to transfer the state from previous attempt.
       appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
@@ -889,7 +903,6 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
                                                     RMAppAttemptEvent event) {
-      appAttempt.checkAttemptStoreError(event);
       appAttempt.launchAttempt();
     }
   }
@@ -1041,14 +1054,6 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
-      if (storeEvent.getUpdatedException() != null) {
-        LOG.error("Failed to update the final state of application attempt: "
-            + storeEvent.getApplicationAttemptId(),
-          storeEvent.getUpdatedException());
-        ExitUtil.terminate(1, storeEvent.getUpdatedException());
-      }
-
       RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
 
       if (appAttempt.transitionTodo instanceof SingleArcTransition) {
@@ -1178,12 +1183,11 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
                             RMAppAttemptEvent event) {
-      appAttempt.checkAttemptStoreError(event);
-      // TODO Today unmanaged AM client is waiting for app state to be Accepted to
-      // launch the AM. This is broken since we changed to start the attempt
-      // after the application is Accepted. We may need to introduce an attempt
-      // report that client can rely on to query the attempt state and choose to
-      // launch the unmanaged AM.
+      // create AMRMToken
+      appAttempt.amrmToken =
+          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.applicationAttemptId);
+
       super.transition(appAttempt, event);
     }    
   }
@@ -1671,18 +1675,6 @@ public class RMAppAttemptImpl implements
     rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
   }
   
-  private void checkAttemptStoreError(RMAppAttemptEvent event) {
-    RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
-    if(storeEvent.getStoredException() != null)
-    {
-      // This needs to be handled for HA and give up master status if we got
-      // fenced
-      LOG.error("Failed to store attempt: " + getAppAttemptId(),
-                storeEvent.getStoredException());
-      ExitUtil.terminate(1, storeEvent.getStoredException());
-    }
-  }
-
   private void storeAttempt() {
     // store attempt data in a non-blocking manner to prevent dispatcher
     // thread starvation and wait for state to be saved

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Aug 20 01:34:29 2014
@@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNod
   private final RMContext context;
   private final String hostName;
   private final int commandPort;
-  private final int httpPort;
+  private int httpPort;
   private final String nodeAddress; // The containerManager address
-  private final String httpAddress;
+  private String httpAddress;
   private volatile ResourceOption resourceOption;
   private final Node node;
 
@@ -456,6 +456,24 @@ public class RMNodeImpl implements RMNod
     }
   }
 
+  private static void handleRunningAppOnNode(RMNodeImpl rmNode,
+      RMContext context, ApplicationId appId, NodeId nodeId) {
+    RMApp app = context.getRMApps().get(appId);
+
+    // if we failed getting app by appId, maybe something wrong happened, just
+    // add the app to the finishedApplications list so that the app can be
+    // cleaned up on the NM
+    if (null == app) {
+      LOG.warn("Cannot get RMApp by appId=" + appId
+          + ", just added it to finishedApplications list for cleanup");
+      rmNode.finishedApplications.add(appId);
+      return;
+    }
+
+    context.getDispatcher().getEventHandler()
+        .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
+  }
+
   public static class AddNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
@@ -496,24 +514,6 @@ public class RMNodeImpl implements RMNod
         new NodesListManagerEvent(
             NodesListManagerEventType.NODE_USABLE, rmNode));
     }
-
-    void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context,
-        ApplicationId appId, NodeId nodeId) {
-      RMApp app = context.getRMApps().get(appId);
-      
-      // if we failed getting app by appId, maybe something wrong happened, just
-      // add the app to the finishedApplications list so that the app can be
-      // cleaned up on the NM
-      if (null == app) {
-        LOG.warn("Cannot get RMApp by appId=" + appId
-            + ", just added it to finishedApplications list for cleanup");
-        rmNode.finishedApplications.add(appId);
-        return;
-      }
-
-      context.getDispatcher().getEventHandler()
-          .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
-    }
   }
 
   public static class ReconnectNodeTransition implements
@@ -521,36 +521,22 @@ public class RMNodeImpl implements RMNod
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      // Kill containers since node is rejoining.
-      rmNode.nodeUpdateQueue.clear();
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeRemovedSchedulerEvent(rmNode));
-
-      RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
+      RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
+      RMNode newNode = reconnectEvent.getReconnectedNode();
       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
-      if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
-          && rmNode.getHttpPort() == newNode.getHttpPort()) {
-        // Reset heartbeat ID since node just restarted.
-        rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
-        if (rmNode.getState() != NodeState.UNHEALTHY) {
-          // Only add new node if old state is not UNHEALTHY
-          rmNode.context.getDispatcher().getEventHandler().handle(
-              new NodeAddedSchedulerEvent(rmNode));
-        }
-      } else {
-        // Reconnected node differs, so replace old node and start new node
-        switch (rmNode.getState()) {
-        case RUNNING:
-          ClusterMetrics.getMetrics().decrNumActiveNodes();
-          break;
-        case UNHEALTHY:
-          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
-          break;
+      rmNode.httpPort = newNode.getHttpPort();
+      rmNode.httpAddress = newNode.getHttpAddress();
+      rmNode.resourceOption = newNode.getResourceOption();
+
+      // Reset heartbeat ID since node just restarted.
+      rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+
+      if (null != reconnectEvent.getRunningApplications()) {
+        for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
+          handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
         }
-        rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
-        rmNode.context.getDispatcher().getEventHandler().handle(
-            new RMNodeStartedEvent(newNode.getNodeID(), null, null));
       }
+
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodesListManagerEvent(
               NodesListManagerEventType.NODE_USABLE, rmNode));

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java Wed Aug 20 01:34:29 2014
@@ -18,17 +18,27 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 
 public class RMNodeReconnectEvent extends RMNodeEvent {
   private RMNode reconnectedNode;
+  private List<ApplicationId> runningApplications;
 
-  public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+  public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
+      List<ApplicationId> runningApps) {
     super(nodeId, RMNodeEventType.RECONNECTED);
     reconnectedNode = newNode;
+    runningApplications = runningApps;
   }
 
   public RMNode getReconnectedNode() {
     return reconnectedNode;
   }
+
+  public List<ApplicationId> getRunningApplications() {
+    return runningApplications;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Wed Aug 20 01:34:29 2014
@@ -18,14 +18,19 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -33,21 +38,34 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.util.concurrent.SettableFuture;
+
+
 @SuppressWarnings("unchecked")
 public abstract class AbstractYarnScheduler
     <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
@@ -66,6 +84,7 @@ public abstract class AbstractYarnSchedu
 
   protected RMContext rmContext;
   protected Map<ApplicationId, SchedulerApplication<T>> applications;
+  protected int nmExpireInterval;
 
   protected final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
@@ -81,6 +100,15 @@ public abstract class AbstractYarnSchedu
     super(name);
   }
 
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    nmExpireInterval =
+        conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    createReleaseCache();
+    super.serviceInit(conf);
+  }
+
   public synchronized List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();
@@ -275,6 +303,19 @@ public abstract class AbstractYarnSchedu
           ((RMContainerImpl)rmContainer).setAMContainer(true);
         }
       }
+
+      synchronized (schedulerAttempt) {
+        Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
+        if (releases.contains(container.getContainerId())) {
+          // release the container
+          rmContainer.handle(new RMContainerFinishedEvent(container
+            .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
+            container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
+            RMContainerEventType.RELEASED));
+          releases.remove(container.getContainerId());
+          LOG.info(container.getContainerId() + " is released by application.");
+        }
+      }
     }
   }
 
@@ -314,7 +355,109 @@ public abstract class AbstractYarnSchedu
     }
   }
 
+  protected void createReleaseCache() {
+    // Cleanup the cache after nm expire interval.
+    new Timer().schedule(new TimerTask() {
+      @Override
+      public void run() {
+        for (SchedulerApplication<T> app : applications.values()) {
+
+          T attempt = app.getCurrentAppAttempt();
+          synchronized (attempt) {
+            for (ContainerId containerId : attempt.getPendingRelease()) {
+              RMAuditLogger.logFailure(
+                app.getUser(),
+                AuditConstants.RELEASE_CONTAINER,
+                "Unauthorized access or invalid container",
+                "Scheduler",
+                "Trying to release container not owned by app or with invalid id.",
+                attempt.getApplicationId(), containerId);
+            }
+            attempt.getPendingRelease().clear();
+          }
+        }
+        LOG.info("Release request cache is cleaned up");
+      }
+    }, nmExpireInterval);
+  }
+
+  // clean up a completed container
+  protected abstract void completedContainer(RMContainer rmContainer,
+      ContainerStatus containerStatus, RMContainerEventType event);
+
+  protected void releaseContainers(List<ContainerId> containers,
+      SchedulerApplicationAttempt attempt) {
+    for (ContainerId containerId : containers) {
+      RMContainer rmContainer = getRMContainer(containerId);
+      if (rmContainer == null) {
+        if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+            < nmExpireInterval) {
+          LOG.info(containerId + " doesn't exist. Add the container"
+              + " to the release request cache as it maybe on recovery.");
+          synchronized (attempt) {
+            attempt.getPendingRelease().add(containerId);
+          }
+        } else {
+          RMAuditLogger.logFailure(attempt.getUser(),
+            AuditConstants.RELEASE_CONTAINER,
+            "Unauthorized access or invalid container", "Scheduler",
+            "Trying to release container not owned by app or with invalid id.",
+            attempt.getApplicationId(), containerId);
+        }
+      }
+      completedContainer(rmContainer,
+        SchedulerUtils.createAbnormalContainerStatus(containerId,
+          SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
+    }
+  }
+
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
+
+  @Override
+  public synchronized void moveAllApps(String sourceQueue, String destQueue)
+      throws YarnException {
+    // check if destination queue is a valid leaf queue
+    try {
+      getQueueInfo(destQueue, false, false);
+    } catch (IOException e) {
+      LOG.warn(e);
+      throw new YarnException(e);
+    }
+    // check if source queue is a valid
+    List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
+    if (apps == null) {
+      String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
+      LOG.warn(errMsg);
+      throw new YarnException(errMsg);
+    }
+    // generate move events for each pending/running app
+    for (ApplicationAttemptId app : apps) {
+      SettableFuture<Object> future = SettableFuture.create();
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
+    }
+  }
+
+  @Override
+  public synchronized void killAllAppsInQueue(String queueName)
+      throws YarnException {
+    // check if queue is a valid
+    List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
+    if (apps == null) {
+      String errMsg = "The specified Queue: " + queueName + " doesn't exist";
+      LOG.warn(errMsg);
+      throw new YarnException(errMsg);
+    }
+    // generate kill events for each pending/running app
+    for (ApplicationAttemptId app : apps) {
+      this.rmContext
+          .getDispatcher()
+          .getEventHandler()
+          .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Aug 20 01:34:29 2014
@@ -54,7 +54,7 @@ public class AppSchedulingInfo {
   private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
   private final ApplicationAttemptId applicationAttemptId;
   final ApplicationId applicationId;
-  private final String queueName;
+  private String queueName;
   Queue queue;
   final String user;
   // TODO making containerIdCounter long
@@ -360,7 +360,7 @@ public class AppSchedulingInfo {
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
-    // Update cloned RackLocal and OffRack requests for recovery
+    // Update cloned OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
@@ -410,6 +410,7 @@ public class AppSchedulingInfo {
     activeUsersManager = newQueue.getActiveUsersManager();
     activeUsersManager.activateApplication(user, applicationId);
     this.queue = newQueue;
+    this.queueName = newQueue.getQueueName();
   }
 
   synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Wed Aug 20 01:34:29 2014
@@ -17,13 +17,14 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 
@@ -87,6 +88,13 @@ public class SchedulerApplicationAttempt
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
 
+  // This pendingRelease is used in work-preserving recovery scenario to keep
+  // track of the AM's outstanding release requests. RM on recovery could
+  // receive the release request form AM before it receives the container status
+  // from NM for recovery. In this case, the to-be-recovered containers reported
+  // by NM should not be recovered.
+  private Set<ContainerId> pendingRelease = null;
+
   /**
    * Count how many times the application has been given an opportunity
    * to schedule a task at each priority. Each time the scheduler
@@ -114,7 +122,7 @@ public class SchedulerApplicationAttempt
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
             activeUsersManager, rmContext.getEpoch());
     this.queue = queue;
-    
+    this.pendingRelease = new HashSet<ContainerId>();
     if (rmContext.getRMApps() != null &&
         rmContext.getRMApps()
             .containsKey(applicationAttemptId.getApplicationId())) {
@@ -163,6 +171,10 @@ public class SchedulerApplicationAttempt
     return appSchedulingInfo.getResourceRequests(priority);
   }
 
+  public Set<ContainerId> getPendingRelease() {
+    return this.pendingRelease;
+  }
+
   public int getNewContainerId() {
     return appSchedulingInfo.getNewContainerId();
   }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Wed Aug 20 01:34:29 2014
@@ -153,14 +153,17 @@ public class SchedulerUtils {
    * @param rmNode RMNode with new resource view
    * @param clusterResource the cluster's resource that need to update
    * @param log Scheduler's log for resource change
+   * @return true if the resources have changed
    */
-  public static void updateResourceIfChanged(SchedulerNode node, 
+  public static boolean updateResourceIfChanged(SchedulerNode node,
       RMNode rmNode, Resource clusterResource, Log log) {
+    boolean result = false;
     Resource oldAvailableResource = node.getAvailableResource();
     Resource newAvailableResource = Resources.subtract(
         rmNode.getTotalCapability(), node.getUsedResource());
     
     if (!newAvailableResource.equals(oldAvailableResource)) {
+      result = true;
       Resource deltaResource = Resources.subtract(newAvailableResource,
           oldAvailableResource);
       // Reflect resource change to scheduler node.
@@ -176,6 +179,8 @@ public class SchedulerUtils {
           + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
           + deltaResource.getMemory() +"MB");
     }
+
+    return result;
   }
 
   /**

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Wed Aug 20 01:34:29 2014
@@ -202,4 +202,22 @@ public interface YarnScheduler extends E
   @Evolving
   public String moveApplication(ApplicationId appId, String newQueue)
       throws YarnException;
+
+  /**
+   * Completely drain sourceQueue of applications, by moving all of them to
+   * destQueue.
+   *
+   * @param sourceQueue
+   * @param destQueue
+   * @throws YarnException
+   */
+  void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
+
+  /**
+   * Terminate all applications in the specified queue.
+   *
+   * @param queueName the name of queue to be drained
+   * @throws YarnException
+   */
+  void killAllAppsInQueue(String queueName) throws YarnException;
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Wed Aug 20 01:34:29 2014
@@ -238,4 +238,22 @@ extends org.apache.hadoop.yarn.server.re
    * @param apps the collection to add the applications to
    */
   public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps);
+
+  /**
+  * Detach a container from this queue
+  * @param clusterResource the current cluster resource
+  * @param application application to which the container was assigned
+  * @param container the container to detach
+  */
+  public void detachContainer(Resource clusterResource,
+               FiCaSchedulerApp application, RMContainer container);
+
+  /**
+   * Attach a container to this queue
+   * @param clusterResource the current cluster resource
+   * @param application application to which the container was assigned
+   * @param container the container to attach
+   */
+  public void attachContainer(Resource clusterResource,
+               FiCaSchedulerApp application, RMContainer container);
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Wed Aug 20 01:34:29 2014
@@ -17,6 +17,9 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -24,6 +27,8 @@ import org.apache.hadoop.yarn.util.resou
 
 class CSQueueUtils {
   
+  private static final Log LOG = LogFactory.getLog(CSQueueUtils.class);
+
   final static float EPSILON = 0.0001f;
   
   public static void checkMaxCapacity(String queueName, 
@@ -113,4 +118,52 @@ class CSQueueUtils {
             )
         );
    }
+
+   public static float getAbsoluteMaxAvailCapacity(
+      ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) {
+      CSQueue parent = queue.getParent();
+      if (parent == null) {
+        return queue.getAbsoluteMaximumCapacity();
+      }
+
+      //Get my parent's max avail, needed to determine my own
+      float parentMaxAvail = getAbsoluteMaxAvailCapacity(
+        resourceCalculator, clusterResource, parent);
+      //...and as a resource
+      Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail);
+
+      //check for no resources parent before dividing, if so, max avail is none
+      if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) {
+        return 0.0f;
+      }
+      //sibling used is parent used - my used...
+      float siblingUsedCapacity = Resources.ratio(
+                 resourceCalculator,
+                 Resources.subtract(parent.getUsedResources(), queue.getUsedResources()),
+                 parentResource);
+      //my max avail is the lesser of my max capacity and what is unused from my parent
+      //by my siblings (if they are beyond their base capacity)
+      float maxAvail = Math.min(
+        queue.getMaximumCapacity(),
+        1.0f - siblingUsedCapacity);
+      //and, mutiply by parent to get absolute (cluster relative) value
+      float absoluteMaxAvail = maxAvail * parentMaxAvail;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("qpath " + queue.getQueuePath());
+        LOG.debug("parentMaxAvail " + parentMaxAvail);
+        LOG.debug("siblingUsedCapacity " + siblingUsedCapacity);
+        LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity());
+        LOG.debug("maxAvail " + maxAvail);
+        LOG.debug("absoluteMaxAvail " + absoluteMaxAvail);
+      }
+
+      if (absoluteMaxAvail < 0.0f) {
+        absoluteMaxAvail = 0.0f;
+      } else if (absoluteMaxAvail > 1.0f) {
+        absoluteMaxAvail = 1.0f;
+      }
+
+      return absoluteMaxAvail;
+   }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Aug 20 01:34:29 2014
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -41,6 +39,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -53,15 +52,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -76,6 +71,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -93,6 +90,7 @@ import org.apache.hadoop.yarn.util.resou
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 @LimitedPrivate("yarn")
 @Evolving
@@ -198,6 +196,16 @@ public class CapacityScheduler extends
           + ".scheduling-interval-ms";
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
   
+  private boolean overrideWithQueueMappings = false;
+  private List<QueueMapping> mappings = null;
+  private Groups groups;
+
+  @VisibleForTesting
+  public synchronized String getMappedQueueForTest(String user)
+      throws IOException {
+    return getMappedQueue(user);
+  }
+
   public CapacityScheduler() {
     super(CapacityScheduler.class.getName());
   }
@@ -262,7 +270,6 @@ public class CapacityScheduler extends
     this.applications =
         new ConcurrentHashMap<ApplicationId,
             SchedulerApplication<FiCaSchedulerApp>>();
-
     initializeQueues(this.conf);
 
     scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -401,7 +408,32 @@ public class CapacityScheduler extends
     }
   }
   private static final QueueHook noop = new QueueHook();
-  
+
+  private void initializeQueueMappings() throws IOException {
+    overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+    LOG.info("Initialized queue mappings, override: "
+        + overrideWithQueueMappings);
+    // Get new user/group mappings
+    List<QueueMapping> newMappings = conf.getQueueMappings();
+    //check if mappings refer to valid queues
+    for (QueueMapping mapping : newMappings) {
+      if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
+          !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+        CSQueue queue = queues.get(mapping.queue);
+        if (queue == null || !(queue instanceof LeafQueue)) {
+          throw new IOException(
+              "mapping contains invalid or non-leaf queue " + mapping.queue);
+        }
+      }
+    }
+    //apply the new mappings since they are valid
+    mappings = newMappings;
+    // initialize groups if mappings are present
+    if (mappings.size() > 0) {
+      groups = new Groups(conf);
+    }
+  }
+
   @Lock(CapacityScheduler.class)
   private void initializeQueues(CapacitySchedulerConfiguration conf)
     throws IOException {
@@ -409,7 +441,9 @@ public class CapacityScheduler extends
     root = 
         parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
             queues, queues, noop);
+
     LOG.info("Initialized root queue " + root);
+    initializeQueueMappings();
   }
 
   @Lock(CapacityScheduler.class)
@@ -429,6 +463,7 @@ public class CapacityScheduler extends
     
     // Re-configure queues
     root.reinitialize(newRoot, clusterResource);
+    initializeQueueMappings();
   }
 
   /**
@@ -516,12 +551,73 @@ public class CapacityScheduler extends
   }
 
   synchronized CSQueue getQueue(String queueName) {
+    if (queueName == null) {
+      return null;
+    }
     return queues.get(queueName);
   }
 
+  private static final String CURRENT_USER_MAPPING = "%user";
+
+  private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+  private String getMappedQueue(String user) throws IOException {
+    for (QueueMapping mapping : mappings) {
+      if (mapping.type == MappingType.USER) {
+        if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+          if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+            return user;
+          }
+          else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+            return groups.getGroups(user).get(0);
+          }
+          else {
+            return mapping.queue;
+          }
+        }
+        if (user.equals(mapping.source)) {
+          return mapping.queue;
+        }
+      }
+      if (mapping.type == MappingType.GROUP) {
+        for (String userGroups : groups.getGroups(user)) {
+          if (userGroups.equals(mapping.source)) {
+            return mapping.queue;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
   private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user, boolean isAppRecovering) {
-    // santiy checks.
+    String queueName, String user, boolean isAppRecovering) {
+
+    if (mappings != null && mappings.size() > 0) {
+      try {
+        String mappedQueue = getMappedQueue(user);
+        if (mappedQueue != null) {
+          // We have a mapping, should we use it?
+          if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+              || overrideWithQueueMappings) {
+            LOG.info("Application " + applicationId + " user " + user
+                + " mapping [" + queueName + "] to [" + mappedQueue
+                + "] override " + overrideWithQueueMappings);
+            queueName = mappedQueue;
+            RMApp rmApp = rmContext.getRMApps().get(applicationId);
+            rmApp.setQueue(queueName);
+          }
+        }
+      } catch (IOException ioex) {
+        String message = "Failed to submit application " + applicationId +
+            " submitted by user " + user + " reason: " + ioex.getMessage();
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppRejectedEvent(applicationId, message));
+        return;
+      }
+    }
+
+    // sanity checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
       String message = "Application " + applicationId + 
@@ -547,6 +643,8 @@ public class CapacityScheduler extends
           .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
       return;
     }
+    // update the metrics
+    queue.getMetrics().submitApp(user);
     SchedulerApplication<FiCaSchedulerApp> application =
         new SchedulerApplication<FiCaSchedulerApp>(queue, user);
     applications.put(applicationId, application);
@@ -689,21 +787,7 @@ public class CapacityScheduler extends
         getMinimumResourceCapability(), maximumAllocation);
 
     // Release containers
-    for (ContainerId releasedContainerId : release) {
-      RMContainer rmContainer = getRMContainer(releasedContainerId);
-      if (rmContainer == null) {
-         RMAuditLogger.logFailure(application.getUser(),
-             AuditConstants.RELEASE_CONTAINER, 
-             "Unauthorized access or invalid container", "CapacityScheduler",
-             "Trying to release container not owned by app or with invalid id",
-             application.getApplicationId(), releasedContainerId);
-      }
-      completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              releasedContainerId, 
-              SchedulerUtils.RELEASED_CONTAINER),
-          RMContainerEventType.RELEASED);
-    }
+    releaseContainers(release, application);
 
     synchronized (application) {
 
@@ -783,7 +867,10 @@ public class CapacityScheduler extends
     FiCaSchedulerNode node = getNode(nm.getNodeID());
     
     // Update resource if any change
-    SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
+    if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource,
+        LOG)) {
+      root.updateClusterResource(clusterResource);
+    }
     
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@@ -896,8 +983,8 @@ public class CapacityScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser(),
-        appAddedEvent.getIsAppRecovering());
+        appAddedEvent.getQueue(),
+        appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -995,7 +1082,8 @@ public class CapacityScheduler extends
   }
   
   @Lock(CapacityScheduler.class)
-  private synchronized void completedContainer(RMContainer rmContainer,
+  @Override
+  protected synchronized void completedContainer(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
     if (rmContainer == null) {
       LOG.info("Null container completed...");
@@ -1128,4 +1216,59 @@ public class CapacityScheduler extends
       throw new IOException(e);
     }
   }
+
+  @Override
+  public synchronized String moveApplication(ApplicationId appId,
+      String targetQueueName) throws YarnException {
+    FiCaSchedulerApp app =
+        getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
+    String sourceQueueName = app.getQueue().getQueueName();
+    LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+    LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
+    // Validation check - ACLs, submission limits for user & queue
+    String user = app.getUser();
+    try {
+      dest.submitApplication(appId, user, targetQueueName);
+    } catch (AccessControlException e) {
+      throw new YarnException(e);
+    }
+    // Move all live containers
+    for (RMContainer rmContainer : app.getLiveContainers()) {
+      source.detachContainer(clusterResource, app, rmContainer);
+      // attach the Container to another queue
+      dest.attachContainer(clusterResource, app, rmContainer);
+    }
+    // Detach the application..
+    source.finishApplicationAttempt(app, sourceQueueName);
+    source.getParent().finishApplication(appId, app.getUser());
+    // Finish app & update metrics
+    app.move(dest);
+    // Submit to a new queue
+    dest.submitApplicationAttempt(app, user);
+    applications.get(appId).setQueue(dest);
+    LOG.info("App: " + app.getApplicationId() + " successfully moved from "
+        + sourceQueueName + " to: " + targetQueueName);
+    return targetQueueName;
+  }
+
+  /**
+   * Check that the String provided in input is the name of an existing,
+   * LeafQueue, if successful returns the queue.
+   *
+   * @param queue
+   * @return the LeafQueue
+   * @throws YarnException
+   */
+  private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+    CSQueue ret = this.getQueue(queue);
+    if (ret == null) {
+      throw new YarnException("The specified Queue: " + queue
+          + " doesn't exist");
+    }
+    if (!(ret instanceof LeafQueue)) {
+      throw new YarnException("The specified Queue: " + queue
+          + " is not a Leaf Queue. Move is supported only for Leaf Queues.");
+    }
+    return (LeafQueue) ret;
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Wed Aug 20 01:34:29 2014
@@ -18,8 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -145,6 +144,44 @@ public class CapacitySchedulerConfigurat
 
   @Private
   public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+
+  @Private
+  public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
+
+  @Private
+  public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
+
+  @Private
+  public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
+
+  @Private
+  public static class QueueMapping {
+
+    public enum MappingType {
+
+      USER("u"),
+      GROUP("g");
+      private final String type;
+      private MappingType(String type) {
+        this.type = type;
+      }
+
+      public String toString() {
+        return type;
+      }
+
+    };
+
+    MappingType type;
+    String source;
+    String queue;
+
+    public QueueMapping(MappingType type, String source, String queue) {
+      this.type = type;
+      this.source = source;
+      this.queue = queue;
+    }
+  }
   
   public CapacitySchedulerConfiguration() {
     this(new Configuration());
@@ -378,4 +415,82 @@ public class CapacitySchedulerConfigurat
     setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
   }
 
+  public boolean getOverrideWithQueueMappings() {
+    return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
+        DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
+  }
+
+  /**
+   * Returns a collection of strings, trimming leading and trailing whitespeace
+   * on each value
+   *
+   * @param str
+   *          String to parse
+   * @param delim
+   *          delimiter to separate the values
+   * @return Collection of parsed elements.
+   */
+  private static Collection<String> getTrimmedStringCollection(String str,
+      String delim) {
+    List<String> values = new ArrayList<String>();
+    if (str == null)
+      return values;
+    StringTokenizer tokenizer = new StringTokenizer(str, delim);
+    while (tokenizer.hasMoreTokens()) {
+      String next = tokenizer.nextToken();
+      if (next == null || next.trim().isEmpty()) {
+        continue;
+      }
+      values.add(next.trim());
+    }
+    return values;
+  }
+
+  /**
+   * Get user/group mappings to queues.
+   *
+   * @return user/groups mappings or null on illegal configs
+   */
+  public List<QueueMapping> getQueueMappings() {
+    List<QueueMapping> mappings =
+        new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
+    Collection<String> mappingsString =
+        getTrimmedStringCollection(QUEUE_MAPPING);
+    for (String mappingValue : mappingsString) {
+      String[] mapping =
+          getTrimmedStringCollection(mappingValue, ":")
+              .toArray(new String[] {});
+      if (mapping.length != 3 || mapping[1].length() == 0
+          || mapping[2].length() == 0) {
+        throw new IllegalArgumentException(
+            "Illegal queue mapping " + mappingValue);
+      }
+
+      QueueMapping m;
+      try {
+        QueueMapping.MappingType mappingType;
+        if (mapping[0].equals("u")) {
+          mappingType = QueueMapping.MappingType.USER;
+        } else if (mapping[0].equals("g")) {
+          mappingType = QueueMapping.MappingType.GROUP;
+        } else {
+          throw new IllegalArgumentException(
+              "unknown mapping prefix " + mapping[0]);
+        }
+        m = new QueueMapping(
+                mappingType,
+                mapping[1],
+                mapping[2]);
+      } catch (Throwable t) {
+        throw new IllegalArgumentException(
+            "Illegal queue mapping " + mappingValue);
+      }
+
+      if (m != null) {
+        mappings.add(m);
+      }
+    }
+
+    return mappings;
+  }
 }