You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [10/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Aug 19 23:49:39 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
@@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
@@ -79,6 +81,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
@@ -88,6 +91,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -107,12 +111,15 @@ public class ApplicationMasterService ex
       new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
   private final AllocateResponse resync =
       recordFactory.newRecordInstance(AllocateResponse.class);
+  private final AllocateResponse shutdown =
+      recordFactory.newRecordInstance(AllocateResponse.class);
   private final RMContext rmContext;
 
   public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
     super(ApplicationMasterService.class.getName());
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
     this.rScheduler = scheduler;
+    this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
     this.resync.setAMCommand(AMCommand.AM_RESYNC);
     this.rmContext = rmContext;
   }
@@ -123,6 +130,7 @@ public class ApplicationMasterService ex
     YarnRPC rpc = YarnRPC.create(conf);
 
     InetSocketAddress masterServiceAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
         YarnConfiguration.RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
@@ -155,7 +163,9 @@ public class ApplicationMasterService ex
     
     this.server.start();
     this.bindAddress =
-        conf.updateConnectAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+                               YarnConfiguration.RM_SCHEDULER_ADDRESS,
+                               YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
                                server.getListenerAddress());
     super.serviceStart();
   }
@@ -182,7 +192,7 @@ public class ApplicationMasterService ex
     return result;
   }
 
-  private ApplicationAttemptId authorizeRequest()
+  private AMRMTokenIdentifier authorizeRequest()
       throws YarnException {
 
     UserGroupInformation remoteUgi;
@@ -219,7 +229,7 @@ public class ApplicationMasterService ex
       throw RPCUtil.getRemoteException(message);
     }
 
-    return appTokenIdentifier.getApplicationAttemptId();
+    return appTokenIdentifier;
   }
 
   @Override
@@ -227,7 +237,9 @@ public class ApplicationMasterService ex
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    ApplicationAttemptId applicationAttemptId = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    ApplicationAttemptId applicationAttemptId =
+        amrmTokenIdentifier.getApplicationAttemptId();
 
     ApplicationId appID = applicationAttemptId.getApplicationId();
     AllocateResponseLock lock = responseMap.get(applicationAttemptId);
@@ -298,9 +310,12 @@ public class ApplicationMasterService ex
         List<NMToken> nmTokens = new ArrayList<NMToken>();
         for (Container container : transferredContainers) {
           try {
-            nmTokens.add(rmContext.getNMTokenSecretManager()
-              .createAndGetNMToken(app.getUser(), applicationAttemptId,
-                container));
+            NMToken token = rmContext.getNMTokenSecretManager()
+                .createAndGetNMToken(app.getUser(), applicationAttemptId,
+                    container);
+            if (null != token) {
+              nmTokens.add(token);
+            }
           } catch (IllegalArgumentException e) {
             // if it's a DNS issue, throw UnknowHostException directly and that
             // will be automatically retried by RMProxy in RPC layer.
@@ -323,7 +338,8 @@ public class ApplicationMasterService ex
       FinishApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    ApplicationAttemptId applicationAttemptId = authorizeRequest();
+    ApplicationAttemptId applicationAttemptId =
+        authorizeRequest().getApplicationAttemptId();
 
     AllocateResponseLock lock = responseMap.get(applicationAttemptId);
     if (lock == null) {
@@ -343,9 +359,9 @@ public class ApplicationMasterService ex
             AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
             message, applicationAttemptId.getApplicationId(),
             applicationAttemptId);
-        throw new InvalidApplicationMasterRequestException(message);
+        throw new ApplicationMasterNotRegisteredException(message);
       }
-      
+
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
 
       RMApp rmApp =
@@ -398,7 +414,10 @@ public class ApplicationMasterService ex
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
 
-    ApplicationAttemptId appAttemptId = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+
+    ApplicationAttemptId appAttemptId =
+        amrmTokenIdentifier.getApplicationAttemptId();
 
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
@@ -406,22 +425,23 @@ public class ApplicationMasterService ex
     AllocateResponseLock lock = responseMap.get(appAttemptId);
     if (lock == null) {
       LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
-      return resync;
+      return shutdown;
     }
     synchronized (lock) {
       AllocateResponse lastResponse = lock.getAllocateResponse();
       if (!hasApplicationMasterRegistered(appAttemptId)) {
         String message =
-            "Application Master is trying to allocate before registering for: "
-                + appAttemptId.getApplicationId();
-        LOG.error(message);
+            "Application Master is not registered for known application: "
+                + appAttemptId.getApplicationId()
+                + ". Let AM resync.";
+        LOG.info(message);
         RMAuditLogger.logFailure(
             this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
                 .getUser(), AuditConstants.REGISTER_AM, "",
             "ApplicationMasterService", message,
             appAttemptId.getApplicationId(),
             appAttemptId);
-        throw new InvalidApplicationMasterRequestException(message);
+        return resync;
       }
 
       if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
@@ -546,6 +566,23 @@ public class ApplicationMasterService ex
       allocateResponse
           .setPreemptionMessage(generatePreemptionMessage(allocation));
 
+      // update AMRMToken if the token is rolled-up
+      MasterKeyData nextMasterKey =
+          this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
+
+      if (nextMasterKey != null
+          && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+            .getKeyId()) {
+        Token<AMRMTokenIdentifier> amrmToken =
+            rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+              appAttemptId);
+        ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken);
+        allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+          .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
+            .toString(), amrmToken.getPassword(), amrmToken.getService()
+            .toString()));
+      }
+
       /*
        * As we are updating the response inside the lock object so we don't
        * need to worry about unregister call occurring in between (which

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Aug 19 23:49:39 2014
@@ -199,7 +199,9 @@ public class ClientRMService extends Abs
     }
     
     this.server.start();
-    clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
+    clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+                                               YarnConfiguration.RM_ADDRESS,
+                                               YarnConfiguration.DEFAULT_RM_ADDRESS,
                                                server.getListenerAddress());
     super.serviceStart();
   }
@@ -213,7 +215,9 @@ public class ClientRMService extends Abs
   }
 
   InetSocketAddress getBindAddress(Configuration conf) {
-    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+    return conf.getSocketAddr(
+            YarnConfiguration.RM_BIND_HOST,
+            YarnConfiguration.RM_ADDRESS,
             YarnConfiguration.DEFAULT_RM_ADDRESS,
             YarnConfiguration.DEFAULT_RM_PORT);
   }
@@ -919,7 +923,7 @@ public class ClientRMService extends Abs
           protoToken.getIdentifier().array(), protoToken.getPassword().array(),
           new Text(protoToken.getKind()), new Text(protoToken.getService()));
 
-      String user = getRenewerForToken(token);
+      String user = UserGroupInformation.getCurrentUser().getUserName();
       rmDTSecretManager.cancelToken(token, user);
       return Records.newRecord(CancelDelegationTokenResponse.class);
     } catch (IOException e) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java Tue Aug 19 23:49:39 2014
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.ActiveStandbyElector;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.ServiceFailedException;
@@ -60,7 +61,7 @@ public class EmbeddedElectorService exte
   }
 
   @Override
-  protected synchronized void serviceInit(Configuration conf)
+  protected void serviceInit(Configuration conf)
       throws Exception {
     conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
 
@@ -85,8 +86,11 @@ public class EmbeddedElectorService exte
     List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
     List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
 
+    int maxRetryNum = conf.getInt(
+        CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
+        CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
     elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
-        electionZNode, zkAcls, zkAuths, this);
+        electionZNode, zkAcls, zkAuths, this, maxRetryNum);
 
     elector.ensureParentZNode();
     if (!isParentZnodeSafe(clusterId)) {
@@ -98,20 +102,20 @@ public class EmbeddedElectorService exte
   }
 
   @Override
-  protected synchronized void serviceStart() throws Exception {
+  protected void serviceStart() throws Exception {
     elector.joinElection(localActiveNodeInfo);
     super.serviceStart();
   }
 
   @Override
-  protected synchronized void serviceStop() throws Exception {
+  protected void serviceStop() throws Exception {
     elector.quitElection(false);
     elector.terminateConnection();
     super.serviceStop();
   }
 
   @Override
-  public synchronized void becomeActive() throws ServiceFailedException {
+  public void becomeActive() throws ServiceFailedException {
     try {
       rmContext.getRMAdminService().transitionToActive(req);
     } catch (Exception e) {
@@ -120,7 +124,7 @@ public class EmbeddedElectorService exte
   }
 
   @Override
-  public synchronized void becomeStandby() {
+  public void becomeStandby() {
     try {
       rmContext.getRMAdminService().transitionToStandby(req);
     } catch (Exception e) {
@@ -139,13 +143,13 @@ public class EmbeddedElectorService exte
 
   @SuppressWarnings(value = "unchecked")
   @Override
-  public synchronized void notifyFatalError(String errorMessage) {
+  public void notifyFatalError(String errorMessage) {
     rmContext.getDispatcher().getEventHandler().handle(
         new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
   }
 
   @Override
-  public synchronized void fenceOldActive(byte[] oldActiveData) {
+  public void fenceOldActive(byte[] oldActiveData) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Request to fence old active being ignored, " +
           "as embedded leader election doesn't support fencing");
@@ -162,7 +166,7 @@ public class EmbeddedElectorService exte
         .toByteArray();
   }
 
-  private synchronized boolean isParentZnodeSafe(String clusterId)
+  private boolean isParentZnodeSafe(String clusterId)
       throws InterruptedException, IOException, KeeperException {
     byte[] data;
     try {
@@ -190,4 +194,9 @@ public class EmbeddedElectorService exte
     }
     return true;
   }
+
+  public void resetLeaderElection() {
+    elector.quitElection(false);
+    elector.joinElection(localActiveNodeInfo);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Tue Aug 19 23:49:39 2014
@@ -99,4 +99,8 @@ public interface RMContext {
       RMApplicationHistoryWriter rmApplicationHistoryWriter);
 
   ConfigurationProvider getConfigurationProvider();
+
+  boolean isWorkPreservingRecoveryEnabled();
+  
+  int getEpoch();
 }
\ No newline at end of file

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Tue Aug 19 23:49:39 2014
@@ -60,6 +60,7 @@ public class RMContextImpl implements RM
     = new ConcurrentHashMap<String, RMNode>();
 
   private boolean isHAEnabled;
+  private boolean isWorkPreservingRecoveryEnabled;
   private HAServiceState haServiceState =
       HAServiceProtocol.HAServiceState.INITIALIZING;
   
@@ -81,6 +82,7 @@ public class RMContextImpl implements RM
   private ApplicationMasterService applicationMasterService;
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private ConfigurationProvider configurationProvider;
+  private int epoch;
 
   /**
    * Default constructor. To be used in conjunction with setter methods for
@@ -329,6 +331,15 @@ public class RMContextImpl implements RM
     }
   }
 
+  public void setWorkPreservingRecoveryEnabled(boolean enabled) {
+    this.isWorkPreservingRecoveryEnabled = enabled;
+  }
+
+  @Override
+  public boolean isWorkPreservingRecoveryEnabled() {
+    return this.isWorkPreservingRecoveryEnabled;
+  }
+
   @Override
   public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
     return rmApplicationHistoryWriter;
@@ -349,4 +360,13 @@ public class RMContextImpl implements RM
       ConfigurationProvider configurationProvider) {
     this.configurationProvider = configurationProvider;
   }
+
+  @Override
+  public int getEpoch() {
+    return this.epoch;
+  }
+
+ void setEpoch(int epoch) {
+    this.epoch = epoch;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java Tue Aug 19 23:49:39 2014
@@ -60,7 +60,7 @@ public class RMSecretManagerService exte
     clientToAMSecretManager = createClientToAMTokenSecretManager();
     rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
 
-    amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
+    amRmTokenSecretManager = createAMRMTokenSecretManager(conf, this.rmContext);
     rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
 
     rmDTSecretManager =
@@ -115,8 +115,8 @@ public class RMSecretManagerService exte
   }
 
   protected AMRMTokenSecretManager createAMRMTokenSecretManager(
-      Configuration conf) {
-    return new AMRMTokenSecretManager(conf);
+      Configuration conf, RMContext rmContext) {
+    return new AMRMTokenSecretManager(conf, rmContext);
   }
 
   protected ClientToAMTokenSecretManagerInRM createClientToAMTokenSecretManager() {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java Tue Aug 19 23:49:39 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Utility methods to aid serving RM data through the REST and RPC APIs
@@ -225,4 +228,13 @@ public class RMServerUtils {
     }
   }
 
+  /**
+   * Statically defined dummy ApplicationResourceUsageREport.  Used as
+   * a return value when a valid report cannot be found.
+   */
+  public static final ApplicationResourceUsageReport
+    DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
+      BuilderUtils.newApplicationResourceUsageReport(-1, -1,
+          Resources.createResource(-1, -1), Resources.createResource(-1, -1),
+          Resources.createResource(-1, -1));
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -32,11 +33,14 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
@@ -88,8 +92,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMAuthenticationHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
+import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@@ -150,7 +157,8 @@ public class ResourceManager extends Com
   private AppReportFetcher fetcher = null;
   protected ResourceTrackerService resourceTracker;
 
-  private String webAppAddress;
+  @VisibleForTesting
+  protected String webAppAddress;
   private ConfigurationProvider configurationProvider = null;
   /** End of Active services */
 
@@ -225,7 +233,9 @@ public class ResourceManager extends Com
     }
     createAndInitActiveServices();
 
-    webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(this.conf);
+    webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
+                      YarnConfiguration.RM_BIND_HOST,
+                      WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
 
     this.rmLoginUGI = UserGroupInformation.getCurrentUser();
 
@@ -327,7 +337,7 @@ public class ResourceManager extends Com
    * RMActiveServices handles all the Active services in the RM.
    */
   @Private
-  class RMActiveServices extends CompositeService {
+  public class RMActiveServices extends CompositeService {
 
     private DelegationTokenRenewer delegationTokenRenewer;
     private EventHandler<SchedulerEvent> schedulerDispatcher;
@@ -364,9 +374,15 @@ public class ResourceManager extends Com
           YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
 
       RMStateStore rmStore = null;
-      if(isRecoveryEnabled) {
+      if (isRecoveryEnabled) {
         recoveryEnabled = true;
-        rmStore =  RMStateStoreFactory.getStore(conf);
+        rmStore = RMStateStoreFactory.getStore(conf);
+        boolean isWorkPreservingRecoveryEnabled =
+            conf.getBoolean(
+              YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+              YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
+        rmContext
+          .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
       } else {
         recoveryEnabled = false;
         rmStore = new NullRMStateStore();
@@ -401,6 +417,8 @@ public class ResourceManager extends Com
 
       // Initialize the scheduler
       scheduler = createScheduler();
+      scheduler.setRMContext(rmContext);
+      addIfService(scheduler);
       rmContext.setScheduler(scheduler);
 
       schedulerDispatcher = createSchedulerEventDispatcher();
@@ -429,12 +447,6 @@ public class ResourceManager extends Com
       DefaultMetricsSystem.initialize("ResourceManager");
       JvmMetrics.initSingleton("ResourceManager", null);
 
-      try {
-        scheduler.reinitialize(conf, rmContext);
-      } catch (IOException ioe) {
-        throw new RuntimeException("Failed to initialize scheduler", ioe);
-      }
-
       // creating monitors that handle preemption
       createPolicyMonitors();
 
@@ -451,7 +463,6 @@ public class ResourceManager extends Com
       rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
 
       clientRM = createClientRMService();
-      rmContext.setClientRMService(clientRM);
       addService(clientRM);
       rmContext.setClientRMService(clientRM);
 
@@ -480,6 +491,9 @@ public class ResourceManager extends Com
       if(recoveryEnabled) {
         try {
           rmStore.checkVersion();
+          if (rmContext.isWorkPreservingRecoveryEnabled()) {
+            rmContext.setEpoch(rmStore.getAndIncrementEpoch());
+          }
           RMState state = rmStore.loadState();
           recover(state);
         } catch (Exception e) {
@@ -524,11 +538,9 @@ public class ResourceManager extends Com
                   (PreemptableResourceScheduler) scheduler));
           for (SchedulingEditPolicy policy : policies) {
             LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
-            policy.init(conf, rmContext.getDispatcher().getEventHandler(),
-                (PreemptableResourceScheduler) scheduler);
             // periodically check whether we need to take action to guarantee
             // constraints
-            SchedulingMonitor mon = new SchedulingMonitor(policy);
+            SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
             addService(mon);
           }
         } else {
@@ -664,6 +676,7 @@ public class ResourceManager extends Com
             // Transition to standby and reinit active services
             LOG.info("Transitioning RM to Standby mode");
             rm.transitionToStandby(true);
+            rm.adminService.resetLeaderElection();
             return;
           } catch (Exception e) {
             LOG.fatal("Failed to transition RM to Standby mode.");
@@ -785,6 +798,88 @@ public class ResourceManager extends Com
   }
   
   protected void startWepApp() {
+
+    // Use the customized yarn filter instead of the standard kerberos filter to
+    // allow users to authenticate using delegation tokens
+    // 4 conditions need to be satisfied -
+    // 1. security is enabled
+    // 2. http auth type is set to kerberos
+    // 3. "yarn.resourcemanager.webapp.use-yarn-filter" override is set to true
+    // 4. hadoop.http.filter.initializers container AuthenticationFilterInitializer
+
+    Configuration conf = getConfig();
+    boolean useYarnAuthenticationFilter =
+        conf.getBoolean(
+          YarnConfiguration.RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER,
+          YarnConfiguration.DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER);
+    String authPrefix = "hadoop.http.authentication.";
+    String authTypeKey = authPrefix + "type";
+    String filterInitializerConfKey = "hadoop.http.filter.initializers";
+    String actualInitializers = "";
+    Class<?>[] initializersClasses =
+        conf.getClasses(filterInitializerConfKey);
+
+    boolean hasHadoopAuthFilterInitializer = false;
+    boolean hasRMAuthFilterInitializer = false;
+    if (initializersClasses != null) {
+      for (Class<?> initializer : initializersClasses) {
+        if (initializer.getName().equals(
+          AuthenticationFilterInitializer.class.getName())) {
+          hasHadoopAuthFilterInitializer = true;
+        }
+        if (initializer.getName().equals(
+          RMAuthenticationFilterInitializer.class.getName())) {
+          hasRMAuthFilterInitializer = true;
+        }
+      }
+      if (UserGroupInformation.isSecurityEnabled()
+          && useYarnAuthenticationFilter
+          && hasHadoopAuthFilterInitializer
+          && conf.get(authTypeKey, "").equals(
+            KerberosAuthenticationHandler.TYPE)) {
+        ArrayList<String> target = new ArrayList<String>();
+        for (Class<?> filterInitializer : initializersClasses) {
+          if (filterInitializer.getName().equals(
+            AuthenticationFilterInitializer.class.getName())) {
+            if (hasRMAuthFilterInitializer == false) {
+              target.add(RMAuthenticationFilterInitializer.class.getName());
+            }
+            continue;
+          }
+          target.add(filterInitializer.getName());
+        }
+        actualInitializers = StringUtils.join(",", target);
+
+        LOG.info("Using RM authentication filter(kerberos/delegation-token)"
+            + " for RM webapp authentication");
+        RMAuthenticationHandler
+          .setSecretManager(getClientRMService().rmDTSecretManager);
+        String yarnAuthKey =
+            authPrefix + RMAuthenticationFilter.AUTH_HANDLER_PROPERTY;
+        conf.setStrings(yarnAuthKey, RMAuthenticationHandler.class.getName());
+        conf.set(filterInitializerConfKey, actualInitializers);
+      }
+    }
+
+    // if security is not enabled and the default filter initializer has not 
+    // been set, set the initializer to include the
+    // RMAuthenticationFilterInitializer which in turn will set up the simple
+    // auth filter.
+
+    String initializers = conf.get(filterInitializerConfKey);
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      if (initializersClasses == null || initializersClasses.length == 0) {
+        conf.set(filterInitializerConfKey,
+          RMAuthenticationFilterInitializer.class.getName());
+        conf.set(authTypeKey, "simple");
+      } else if (initializers.equals(StaticUserWebFilter.class.getName())) {
+        conf.set(filterInitializerConfKey,
+          RMAuthenticationFilterInitializer.class.getName() + ","
+              + initializers);
+        conf.set(authTypeKey, "simple");
+      }
+    }
+
     Builder<ApplicationMasterService> builder = 
         WebApps
             .$for("cluster", ApplicationMasterService.class, masterService,
@@ -1022,6 +1117,9 @@ public class ResourceManager extends Com
     // recover RMdelegationTokenSecretManager
     rmContext.getRMDelegationTokenSecretManager().recover(state);
 
+    // recover AMRMTokenSecretManager
+    rmContext.getAMRMTokenSecretManager().recover(state);
+
     // recover applications
     rmAppManager.recover(state);
   }
@@ -1031,12 +1129,17 @@ public class ResourceManager extends Com
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
     try {
       Configuration conf = new YarnConfiguration();
-      ResourceManager resourceManager = new ResourceManager();
-      ShutdownHookManager.get().addShutdownHook(
-        new CompositeServiceShutdownHook(resourceManager),
-        SHUTDOWN_HOOK_PRIORITY);
-      resourceManager.init(conf);
-      resourceManager.start();
+      // If -format-state-store, then delete RMStateStore; else startup normally
+      if (argv.length == 1 && argv[0].equals("-format-state-store")) {
+        deleteRMStateStore(conf);
+      } else {
+        ResourceManager resourceManager = new ResourceManager();
+        ShutdownHookManager.get().addShutdownHook(
+          new CompositeServiceShutdownHook(resourceManager),
+          SHUTDOWN_HOOK_PRIORITY);
+        resourceManager.init(conf);
+        resourceManager.start();
+      }
     } catch (Throwable t) {
       LOG.fatal("Error starting ResourceManager", t);
       System.exit(-1);
@@ -1058,6 +1161,9 @@ public class ResourceManager extends Com
     ((Service)dispatcher).init(this.conf);
     ((Service)dispatcher).start();
     removeService((Service)rmDispatcher);
+    // Need to stop previous rmDispatcher before assigning new dispatcher
+    // otherwise causes "AsyncDispatcher event handler" thread leak
+    ((Service) rmDispatcher).stop();
     rmDispatcher = dispatcher;
     addIfService(rmDispatcher);
     rmContext.setDispatcher(rmDispatcher);
@@ -1073,4 +1179,23 @@ public class ResourceManager extends Com
     return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
       YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
   }
+
+  /**
+   * Deletes the RMStateStore
+   *
+   * @param conf
+   * @throws Exception
+   */
+  private static void deleteRMStateStore(Configuration conf) throws Exception {
+    RMStateStore rmStore = RMStateStoreFactory.getStore(conf);
+    rmStore.init(conf);
+    rmStore.start();
+    try {
+      LOG.info("Deleting ResourceManager state store...");
+      rmStore.deleteStore();
+      LOG.info("State store deleted");
+    } finally {
+      rmStore.stop();
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Tue Aug 19 23:49:39 2014
@@ -32,7 +32,6 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -120,6 +121,7 @@ public class ResourceTrackerService exte
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     resourceTrackerAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_BIND_HOST,
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
@@ -174,9 +176,11 @@ public class ResourceTrackerService exte
       }
       refreshServiceAcls(conf, RMPolicyProvider.getInstance());
     }
-
+ 
     this.server.start();
-    conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+    conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
+			   YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
+			   YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
                            server.getListenerAddress());
   }
 
@@ -195,7 +199,7 @@ public class ResourceTrackerService exte
    */
   @SuppressWarnings("unchecked")
   @VisibleForTesting
-  void handleContainerStatus(ContainerStatus containerStatus) {
+  void handleNMContainerStatus(NMContainerStatus containerStatus) {
     ApplicationAttemptId appAttemptId =
         containerStatus.getContainerId().getApplicationAttemptId();
     RMApp rmApp =
@@ -219,11 +223,14 @@ public class ResourceTrackerService exte
     RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
     Container masterContainer = rmAppAttempt.getMasterContainer();
     if (masterContainer.getId().equals(containerStatus.getContainerId())
-        && containerStatus.getState() == ContainerState.COMPLETE) {
+        && containerStatus.getContainerState() == ContainerState.COMPLETE) {
+      ContainerStatus status =
+          ContainerStatus.newInstance(containerStatus.getContainerId(),
+            containerStatus.getContainerState(), containerStatus.getDiagnostics(),
+            containerStatus.getContainerExitStatus());
       // sending master container finished event.
       RMAppAttemptContainerFinishedEvent evt =
-          new RMAppAttemptContainerFinishedEvent(appAttemptId,
-              containerStatus);
+          new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
       rmContext.getDispatcher().getEventHandler().handle(evt);
     }
   }
@@ -240,13 +247,6 @@ public class ResourceTrackerService exte
     Resource capability = request.getResource();
     String nodeManagerVersion = request.getNMVersion();
 
-    if (!request.getContainerStatuses().isEmpty()) {
-      LOG.info("received container statuses on node manager register :"
-          + request.getContainerStatuses());
-      for (ContainerStatus containerStatus : request.getContainerStatuses()) {
-        handleContainerStatus(containerStatus);
-      }
-    }
     RegisterNodeManagerResponse response = recordFactory
         .newRecordInstance(RegisterNodeManagerResponse.class);
 
@@ -305,17 +305,31 @@ public class ResourceTrackerService exte
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+              new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
+                  request.getRunningApplications()));
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeReconnectEvent(nodeId, rmNode));
+          new RMNodeReconnectEvent(nodeId, rmNode,
+              request.getRunningApplications()));
     }
     // On every node manager register we will be clearing NMToken keys if
     // present for any running application.
     this.nmTokenSecretManager.removeNodeKey(nodeId);
     this.nmLivelinessMonitor.register(nodeId);
+    
+    // Handle received container status, this should be processed after new
+    // RMNode inserted
+    if (!rmContext.isWorkPreservingRecoveryEnabled()) {
+      if (!request.getNMContainerStatuses().isEmpty()) {
+        LOG.info("received container statuses on node manager register :"
+            + request.getNMContainerStatuses());
+        for (NMContainerStatus status : request.getNMContainerStatuses()) {
+          handleNMContainerStatus(status);
+        }
+      }
+    }
 
     String message =
         "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java Tue Aug 19 23:49:39 2014
@@ -269,7 +269,7 @@ public class RMApplicationHistoryWriter 
         new WritingContainerStartEvent(container.getContainerId(),
           ContainerStartData.newInstance(container.getContainerId(),
             container.getAllocatedResource(), container.getAllocatedNode(),
-            container.getAllocatedPriority(), container.getStartTime())));
+            container.getAllocatedPriority(), container.getCreationTime())));
     }
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Tue Aug 19 23:49:39 2014
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
@@ -226,7 +227,7 @@ public class AMLauncher implements Runna
     }
 
     // Add AMRMToken
-    Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
+    Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
     if (amrmToken != null) {
       credentials.addToken(amrmToken.getService(), amrmToken);
     }
@@ -236,8 +237,12 @@ public class AMLauncher implements Runna
   }
 
   @VisibleForTesting
-  protected Token<AMRMTokenIdentifier> getAMRMToken() {
-    return application.getAMRMToken();
+  protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
+    Token<AMRMTokenIdentifier> amrmToken =
+        this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+          application.getAppAttemptId());
+    ((RMAppAttemptImpl)application).setAMRMToken(amrmToken);
+    return amrmToken;
   }
   
   @SuppressWarnings("unchecked")

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java Tue Aug 19 23:49:39 2014
@@ -21,6 +21,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -34,18 +36,29 @@ public class SchedulingMonitor extends A
   private Thread checkerThread;
   private volatile boolean stopped;
   private long monitorInterval;
+  private RMContext rmContext;
 
-  public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
+  public SchedulingMonitor(RMContext rmContext,
+      SchedulingEditPolicy scheduleEditPolicy) {
     super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
     this.scheduleEditPolicy = scheduleEditPolicy;
-    this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
+    this.rmContext = rmContext;
   }
 
   public long getMonitorInterval() {
     return monitorInterval;
   }
+  
+  @VisibleForTesting
+  public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
+    return scheduleEditPolicy;
+  }
 
+  @SuppressWarnings("unchecked")
   public void serviceInit(Configuration conf) throws Exception {
+    scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+        (PreemptableResourceScheduler) rmContext.getScheduler());
+    this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
     super.serviceInit(conf);
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -110,7 +111,7 @@ public class ProportionalCapacityPreempt
   public static final String NATURAL_TERMINATION_FACTOR =
       "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
 
-  //the dispatcher to send preempt and kill events
+  // the dispatcher to send preempt and kill events
   public EventHandler<ContainerPreemptEvent> dispatcher;
 
   private final Clock clock;
@@ -164,12 +165,17 @@ public class ProportionalCapacityPreempt
     observeOnly = config.getBoolean(OBSERVE_ONLY, false);
     rc = scheduler.getResourceCalculator();
   }
+  
+  @VisibleForTesting
+  public ResourceCalculator getResourceCalculator() {
+    return rc;
+  }
 
   @Override
   public void editSchedule(){
     CSQueue root = scheduler.getRootQueue();
     Resource clusterResources =
-      Resources.clone(scheduler.getClusterResources());
+      Resources.clone(scheduler.getClusterResource());
     containerBasedPreemptOrKill(root, clusterResources);
   }
 
@@ -202,7 +208,9 @@ public class ProportionalCapacityPreempt
     Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
         getContainersToPreempt(queues, clusterResources);
 
-    logToCSV(queues);
+    if (LOG.isDebugEnabled()) {
+      logToCSV(queues);
+    }
 
     // if we are in observeOnly mode return before any action is taken
     if (observeOnly) {
@@ -293,34 +301,31 @@ public class ProportionalCapacityPreempt
     // with the total capacity for this set of queues
     Resource unassigned = Resources.clone(tot_guarant);
 
-    //assign all cluster resources until no more demand, or no resources are left
-    while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
-          unassigned, Resources.none())) {
-      Resource wQassigned = Resource.newInstance(0, 0);
-
-      // we compute normalizedGuarantees capacity based on currently active
-      // queues
-      resetCapacity(rc, unassigned, qAlloc);
-
-      // offer for each queue their capacity first and in following invocations
-      // their share of over-capacity
-      for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
-        TempQueue sub = i.next();
-        Resource wQavail =
-          Resources.multiply(unassigned, sub.normalizedGuarantee);
-        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
-        Resource wQdone = Resources.subtract(wQavail, wQidle);
-        // if the queue returned a value > 0 it means it is fully satisfied
-        // and it is removed from the list of active queues qAlloc
-        if (!Resources.greaterThan(rc, tot_guarant,
-              wQdone, Resources.none())) {
-          i.remove();
-        }
-        Resources.addTo(wQassigned, wQdone);
+    // group queues based on whether they have non-zero guaranteed capacity
+    Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
+    Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
+
+    for (TempQueue q : qAlloc) {
+      if (Resources
+          .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
+        nonZeroGuarQueues.add(q);
+      } else {
+        zeroGuarQueues.add(q);
       }
-      Resources.subtractFrom(unassigned, wQassigned);
     }
 
+    // first compute the allocation as a fixpoint based on guaranteed capacity
+    computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+        false);
+
+    // if any capacity is left unassigned, distributed among zero-guarantee 
+    // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
+    if (!zeroGuarQueues.isEmpty()
+        && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
+      computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+          true);
+    }
+    
     // based on ideal assignment computed above and current assignment we derive
     // how much preemption is required overall
     Resource totPreemptionNeeded = Resource.newInstance(0, 0);
@@ -353,6 +358,46 @@ public class ProportionalCapacityPreempt
     }
 
   }
+  
+  /**
+   * Given a set of queues compute the fix-point distribution of unassigned
+   * resources among them. As pending request of a queue are exhausted, the
+   * queue is removed from the set and remaining capacity redistributed among
+   * remaining queues. The distribution is weighted based on guaranteed
+   * capacity, unless asked to ignoreGuarantee, in which case resources are
+   * distributed uniformly.
+   */
+  private void computeFixpointAllocation(ResourceCalculator rc,
+      Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned, 
+      boolean ignoreGuarantee) {
+    //assign all cluster resources until no more demand, or no resources are left
+    while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
+          unassigned, Resources.none())) {
+      Resource wQassigned = Resource.newInstance(0, 0);
+
+      // we compute normalizedGuarantees capacity based on currently active
+      // queues
+      resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
+      
+      // offer for each queue their capacity first and in following invocations
+      // their share of over-capacity
+      for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+        TempQueue sub = i.next();
+        Resource wQavail =
+          Resources.multiply(unassigned, sub.normalizedGuarantee);
+        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+        Resource wQdone = Resources.subtract(wQavail, wQidle);
+        // if the queue returned a value > 0 it means it is fully satisfied
+        // and it is removed from the list of active queues qAlloc
+        if (!Resources.greaterThan(rc, tot_guarant,
+              wQdone, Resources.none())) {
+          i.remove();
+        }
+        Resources.addTo(wQassigned, wQdone);
+      }
+      Resources.subtractFrom(unassigned, wQassigned);
+    }
+  }
 
   /**
    * Computes a normalizedGuaranteed capacity based on active queues
@@ -361,14 +406,21 @@ public class ProportionalCapacityPreempt
    * @param queues the list of queues to consider
    */
   private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
-      List<TempQueue> queues) {
+      Collection<TempQueue> queues, boolean ignoreGuar) {
     Resource activeCap = Resource.newInstance(0, 0);
-    for (TempQueue q : queues) {
-      Resources.addTo(activeCap, q.guaranteed);
-    }
-    for (TempQueue q : queues) {
-      q.normalizedGuarantee = Resources.divide(rc, clusterResource,
-          q.guaranteed, activeCap);
+    
+    if (ignoreGuar) {
+      for (TempQueue q : queues) {
+        q.normalizedGuarantee = (float)  1.0f / ((float) queues.size());
+      }
+    } else {
+      for (TempQueue q : queues) {
+        Resources.addTo(activeCap, q.guaranteed);
+      }
+      for (TempQueue q : queues) {
+        q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+            q.guaranteed, activeCap);
+      }
     }
   }
 
@@ -385,8 +437,9 @@ public class ProportionalCapacityPreempt
   private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
       List<TempQueue> queues, Resource clusterResource) {
 
-    Map<ApplicationAttemptId,Set<RMContainer>> list =
+    Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
         new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+    List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
 
     for (TempQueue qT : queues) {
       // we act only if we are violating balance by more than
@@ -397,26 +450,83 @@ public class ProportionalCapacityPreempt
         // accounts for natural termination of containers
         Resource resToObtain =
           Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+        Resource skippedAMSize = Resource.newInstance(0, 0);
 
         // lock the leafqueue while we scan applications and unreserve
-        synchronized(qT.leafQueue) {
-          NavigableSet<FiCaSchedulerApp> ns =
-            (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
+        synchronized (qT.leafQueue) {
+          NavigableSet<FiCaSchedulerApp> ns = 
+              (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
           Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
           qT.actuallyPreempted = Resources.clone(resToObtain);
           while (desc.hasNext()) {
             FiCaSchedulerApp fc = desc.next();
-            if (Resources.lessThanOrEqual(rc, clusterResource,
-                resToObtain, Resources.none())) {
+            if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+                Resources.none())) {
               break;
             }
-            list.put(fc.getApplicationAttemptId(),
-            preemptFrom(fc, clusterResource, resToObtain));
+            preemptMap.put(
+                fc.getApplicationAttemptId(),
+                preemptFrom(fc, clusterResource, resToObtain,
+                    skippedAMContainerlist, skippedAMSize));
           }
+          Resource maxAMCapacityForThisQueue = Resources.multiply(
+              Resources.multiply(clusterResource,
+                  qT.leafQueue.getAbsoluteCapacity()),
+              qT.leafQueue.getMaxAMResourcePerQueuePercent());
+
+          // Can try preempting AMContainers (still saving atmost
+          // maxAMCapacityForThisQueue AMResource's) if more resources are
+          // required to be preempted from this Queue.
+          preemptAMContainers(clusterResource, preemptMap,
+              skippedAMContainerlist, resToObtain, skippedAMSize,
+              maxAMCapacityForThisQueue);
         }
       }
     }
-    return list;
+    return preemptMap;
+  }
+
+  /**
+   * As more resources are needed for preemption, saved AMContainers has to be
+   * rescanned. Such AMContainers can be preempted based on resToObtain, but 
+   * maxAMCapacityForThisQueue resources will be still retained.
+   *  
+   * @param clusterResource
+   * @param preemptMap
+   * @param skippedAMContainerlist
+   * @param resToObtain
+   * @param skippedAMSize
+   * @param maxAMCapacityForThisQueue
+   */
+  private void preemptAMContainers(Resource clusterResource,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      List<RMContainer> skippedAMContainerlist, Resource resToObtain,
+      Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
+    for (RMContainer c : skippedAMContainerlist) {
+      // Got required amount of resources for preemption, can stop now
+      if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+          Resources.none())) {
+        break;
+      }
+      // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
+      // container selection iteration for preemption will be stopped. 
+      if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
+          maxAMCapacityForThisQueue)) {
+        break;
+      }
+      Set<RMContainer> contToPrempt = preemptMap.get(c
+          .getApplicationAttemptId());
+      if (null == contToPrempt) {
+        contToPrempt = new HashSet<RMContainer>();
+        preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
+      }
+      contToPrempt.add(c);
+      
+      Resources.subtractFrom(resToObtain, c.getContainer().getResource());
+      Resources.subtractFrom(skippedAMSize, c.getContainer()
+          .getResource());
+    }
+    skippedAMContainerlist.clear();
   }
 
   /**
@@ -428,8 +538,9 @@ public class ProportionalCapacityPreempt
    * @param rsrcPreempt
    * @return
    */
-  private Set<RMContainer> preemptFrom(
-      FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
+  private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
+      Resource clusterResource, Resource rsrcPreempt,
+      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
     Set<RMContainer> ret = new HashSet<RMContainer>();
     ApplicationAttemptId appId = app.getApplicationAttemptId();
 
@@ -461,6 +572,12 @@ public class ProportionalCapacityPreempt
             rsrcPreempt, Resources.none())) {
         return ret;
       }
+      // Skip AM Container from preemption for now.
+      if (c.isAMContainer()) {
+        skippedAMContainerlist.add(c);
+        Resources.addTo(skippedAMSize, c.getContainer().getResource());
+        continue;
+      }
       ret.add(c);
       Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
     }
@@ -515,18 +632,25 @@ public class ProportionalCapacityPreempt
   private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
     TempQueue ret;
     synchronized (root) {
-    float absUsed = root.getAbsoluteUsedCapacity();
+      String queueName = root.getQueueName();
+      float absUsed = root.getAbsoluteUsedCapacity();
+      float absCap = root.getAbsoluteCapacity();
+      float absMaxCap = root.getAbsoluteMaximumCapacity();
+
       Resource current = Resources.multiply(clusterResources, absUsed);
-      Resource guaranteed =
-        Resources.multiply(clusterResources, root.getAbsoluteCapacity());
+      Resource guaranteed = Resources.multiply(clusterResources, absCap);
+      Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
       if (root instanceof LeafQueue) {
         LeafQueue l = (LeafQueue) root;
         Resource pending = l.getTotalResourcePending();
-        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+        ret = new TempQueue(queueName, current, pending, guaranteed,
+            maxCapacity);
+
         ret.setLeafQueue(l);
       } else {
         Resource pending = Resource.newInstance(0, 0);
-        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
+            maxCapacity);
         for (CSQueue c : root.getChildQueues()) {
           ret.addChild(cloneQueues(c, clusterResources));
         }
@@ -551,7 +675,7 @@ public class ProportionalCapacityPreempt
       sb.append(", ");
       tq.appendLogString(sb);
     }
-    LOG.info(sb.toString());
+    LOG.debug(sb.toString());
   }
 
   /**
@@ -563,6 +687,7 @@ public class ProportionalCapacityPreempt
     final Resource current;
     final Resource pending;
     final Resource guaranteed;
+    final Resource maxCapacity;
     Resource idealAssigned;
     Resource toBePreempted;
     Resource actuallyPreempted;
@@ -573,11 +698,12 @@ public class ProportionalCapacityPreempt
     LeafQueue leafQueue;
 
     TempQueue(String queueName, Resource current, Resource pending,
-        Resource guaranteed) {
+        Resource guaranteed, Resource maxCapacity) {
       this.queueName = queueName;
       this.current = current;
       this.pending = pending;
       this.guaranteed = guaranteed;
+      this.maxCapacity = maxCapacity;
       this.idealAssigned = Resource.newInstance(0, 0);
       this.actuallyPreempted = Resource.newInstance(0, 0);
       this.toBePreempted = Resource.newInstance(0, 0);
@@ -614,12 +740,12 @@ public class ProportionalCapacityPreempt
     // the unused ones
     Resource offer(Resource avail, ResourceCalculator rc,
         Resource clusterResource) {
-      // remain = avail - min(avail, current + pending - assigned)
-      Resource accepted = Resources.min(rc, clusterResource,
-          avail,
-          Resources.subtract(
-              Resources.add(current, pending),
-              idealAssigned));
+      // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+      Resource accepted = 
+          Resources.min(rc, clusterResource, 
+              Resources.subtract(maxCapacity, idealAssigned),
+          Resources.min(rc, clusterResource, avail, Resources.subtract(
+              Resources.add(current, pending), idealAssigned)));
       Resource remain = Resources.subtract(avail, accepted);
       Resources.addTo(idealAssigned, accepted);
       return remain;
@@ -628,13 +754,15 @@ public class ProportionalCapacityPreempt
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
-      sb.append("CUR: ").append(current)
+      sb.append(" NAME: " + queueName)
+        .append(" CUR: ").append(current)
         .append(" PEN: ").append(pending)
         .append(" GAR: ").append(guaranteed)
         .append(" NORM: ").append(normalizedGuarantee)
         .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
         .append(" IDEAL_PREEMPT: ").append(toBePreempted)
-        .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
+        .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
+        .append("\n");
 
       return sb.toString();
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,14 +44,22 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -62,14 +71,20 @@ import com.google.common.annotations.Vis
  * FileSystem interface. Does not use directories so that simple key-value
  * stores can be used. The retry policy for the real filesystem client must be
  * configured separately to enable retry of filesystem operations when needed.
+ *
+ * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * separately. The currentMasterkey and nextMasterkey have been stored.
+ * Also, AMRMToken has been removed from ApplicationAttemptState.
  */
 public class FileSystemRMStateStore extends RMStateStore {
 
   public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
 
   protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
-  protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
-    .newInstance(1, 0);
+  protected static final Version CURRENT_VERSION_INFO = Version
+    .newInstance(1, 2);
+  protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
+      "AMRMTokenSecretManagerNode";
 
   protected FileSystem fs;
 
@@ -83,6 +98,7 @@ public class FileSystemRMStateStore exte
   @VisibleForTesting
   Path fsWorkingPath;
 
+  Path amrmTokenSecretManagerRoot;
   @Override
   public synchronized void initInternal(Configuration conf)
       throws Exception{
@@ -90,6 +106,8 @@ public class FileSystemRMStateStore exte
     rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
     rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
     rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
+    amrmTokenSecretManagerRoot =
+        new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
   }
 
   @Override
@@ -107,6 +125,7 @@ public class FileSystemRMStateStore exte
     fs = fsWorkingPath.getFileSystem(conf);
     fs.mkdirs(rmDTSecretManagerRoot);
     fs.mkdirs(rmAppRoot);
+    fs.mkdirs(amrmTokenSecretManagerRoot);
   }
 
   @Override
@@ -115,18 +134,18 @@ public class FileSystemRMStateStore exte
   }
 
   @Override
-  protected RMStateVersion getCurrentVersion() {
+  protected Version getCurrentVersion() {
     return CURRENT_VERSION_INFO;
   }
 
   @Override
-  protected synchronized RMStateVersion loadVersion() throws Exception {
+  protected synchronized Version loadVersion() throws Exception {
     Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
     if (fs.exists(versionNodePath)) {
       FileStatus status = fs.getFileStatus(versionNodePath);
       byte[] data = readFile(versionNodePath, status.getLen());
-      RMStateVersion version =
-          new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+      Version version =
+          new VersionPBImpl(VersionProto.parseFrom(data));
       return version;
     }
     return null;
@@ -136,14 +155,37 @@ public class FileSystemRMStateStore exte
   protected synchronized void storeVersion() throws Exception {
     Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
     byte[] data =
-        ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+        ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
     if (fs.exists(versionNodePath)) {
       updateFile(versionNodePath, data);
     } else {
       writeFile(versionNodePath, data);
     }
   }
-
+  
+  @Override
+  public synchronized int getAndIncrementEpoch() throws Exception {
+    Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
+    int currentEpoch = 0;
+    if (fs.exists(epochNodePath)) {
+      // load current epoch
+      FileStatus status = fs.getFileStatus(epochNodePath);
+      byte[] data = readFile(epochNodePath, status.getLen());
+      Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
+      currentEpoch = epoch.getEpoch();
+      // increment epoch and store it
+      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+          .toByteArray();
+      updateFile(epochNodePath, storeData);
+    } else {
+      // initialize epoch file with 1 for the next time.
+      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+          .toByteArray();
+      writeFile(epochNodePath, storeData);
+    }
+    return currentEpoch;
+  }
+  
   @Override
   public synchronized RMState loadState() throws Exception {
     RMState rmState = new RMState();
@@ -151,9 +193,32 @@ public class FileSystemRMStateStore exte
     loadRMDTSecretManagerState(rmState);
     // recover RM applications
     loadRMAppState(rmState);
+    // recover AMRMTokenSecretManager
+    loadAMRMTokenSecretManagerState(rmState);
     return rmState;
   }
 
+  private void loadAMRMTokenSecretManagerState(RMState rmState)
+      throws Exception {
+    checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
+    Path amrmTokenSecretManagerStateDataDir =
+        new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
+    FileStatus status;
+    try {
+      status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir);
+      assert status.isFile();
+    } catch (FileNotFoundException ex) {
+      return;
+    }
+    byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
+    AMRMTokenSecretManagerStatePBImpl stateData =
+        new AMRMTokenSecretManagerStatePBImpl(
+          AMRMTokenSecretManagerStateProto.parseFrom(data));
+    rmState.amrmTokenSecretManagerState =
+        AMRMTokenSecretManagerState.newInstance(
+          stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
+  }
+
   private void loadRMAppState(RMState rmState) throws Exception {
     try {
       List<ApplicationAttemptState> attempts =
@@ -214,7 +279,8 @@ public class FileSystemRMStateStore exte
                   attemptStateData.getState(),
                   attemptStateData.getFinalTrackingUrl(),
                   attemptStateData.getDiagnostics(),
-                  attemptStateData.getFinalApplicationStatus());
+                  attemptStateData.getFinalApplicationStatus(),
+                  attemptStateData.getAMContainerExitStatus());
 
             // assert child node name is same as application attempt id
             assert attemptId.equals(attemptState.getAttemptId());
@@ -314,7 +380,7 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void storeApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String appIdStr = appId.toString();
     Path appDirPath = getAppDir(rmAppRoot, appIdStr);
     fs.mkdirs(appDirPath);
@@ -334,7 +400,7 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+      ApplicationStateData appStateDataPB) throws Exception {
     String appIdStr = appId.toString();
     Path appDirPath = getAppDir(rmAppRoot, appIdStr);
     Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
@@ -354,7 +420,7 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
@@ -375,7 +441,7 @@ public class FileSystemRMStateStore exte
   @Override
   public synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
-      ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
     Path appDirPath =
         getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
@@ -485,6 +551,13 @@ public class FileSystemRMStateStore exte
     deleteFile(nodeCreatePath);
   }
 
+  @Override
+  public synchronized void deleteStore() throws IOException {
+    if (fs.exists(rootDirPath)) {
+      fs.delete(rootDirPath, true);
+    }
+  }
+
   private Path getAppDir(Path root, String appId) {
     return getNodePath(root, appId);
   }
@@ -560,4 +633,25 @@ public class FileSystemRMStateStore exte
     return new Path(root, nodeName);
   }
 
+  @Override
+  public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState amrmTokenSecretManagerState,
+      boolean isUpdate){
+    Path nodeCreatePath =
+        getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
+    AMRMTokenSecretManagerState data =
+        AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+    byte[] stateData = data.getProto().toByteArray();
+    try {
+      if (isUpdate) {
+        updateFile(nodeCreatePath, stateData);
+      } else {
+        writeFile(nodeCreatePath, stateData);
+      }
+    } catch (Exception ex) {
+      LOG.info("Error storing info for AMRMTokenSecretManager", ex);
+      notifyStoreOperationFailed(ex);
+    }
+  }
+
 }