You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/09/15 04:44:20 UTC

[16/19] hadoop git commit: YARN-1651. CapacityScheduler side changes to support container resize. Contributed by Wangda Tan

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 0b6b8ef..ad28493 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -19,7 +19,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -51,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -58,13 +69,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -87,7 +100,7 @@ public abstract class AbstractYarnScheduler
   protected Resource clusterResource = Resource.newInstance(0, 0);
 
   protected Resource minimumAllocation;
-  private Resource maximumAllocation;
+  protected Resource maximumAllocation;
   private Resource configuredMaximumAllocation;
   private int maxNodeMemory = -1;
   private int maxNodeVCores = -1;
@@ -231,6 +244,55 @@ public abstract class AbstractYarnScheduler
 
     application.containerLaunchedOnNode(containerId, node.getNodeID());
   }
+  
+  protected synchronized void containerIncreasedOnNode(ContainerId containerId,
+      SchedulerNode node, Container increasedContainerReportedByNM) {
+    // Get the application for the finished container
+    SchedulerApplicationAttempt application =
+        getCurrentAttemptForContainer(containerId);
+    if (application == null) {
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " increased container " + containerId + " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+      return;
+    }
+
+    RMContainer rmContainer = getRMContainer(containerId);
+    Resource rmContainerResource = rmContainer.getAllocatedResource();
+    Resource nmContainerResource = increasedContainerReportedByNM.getResource();
+    
+    
+    if (Resources.equals(nmContainerResource, rmContainerResource)){
+      // NM reported expected container size, tell RMContainer. Which will stop
+      // container expire monitor
+      rmContainer.handle(new RMContainerEvent(containerId,
+          RMContainerEventType.NM_DONE_CHANGE_RESOURCE));
+    } else if (Resources.fitsIn(getResourceCalculator(), clusterResource,
+        nmContainerResource, rmContainerResource)) {
+      // when rmContainerResource >= nmContainerResource, we won't do anything,
+      // it is possible a container increased is issued by RM, but AM hasn't
+      // told NM.
+    } else if (Resources.fitsIn(getResourceCalculator(), clusterResource,
+        rmContainerResource, nmContainerResource)) {
+      // When rmContainerResource <= nmContainerResource, it could happen when a
+      // container decreased by RM before it is increased in NM.
+      
+      // Tell NM to decrease the container
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMNodeDecreaseContainerEvent(node.getNodeID(),
+              Arrays.asList(rmContainer.getContainer())));
+    } else {
+      // Something wrong happened, kill the container
+      LOG.warn("Something wrong happened, container size reported by NM"
+          + " is not expected, ContainerID=" + containerId
+          + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
+          + nmContainerResource);
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+    }
+  }
 
   public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
     SchedulerApplication<T> app =
@@ -511,6 +573,36 @@ public abstract class AbstractYarnScheduler
           SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
     }
   }
+  
+  protected void decreaseContainers(
+      List<SchedContainerChangeRequest> decreaseRequests,
+      SchedulerApplicationAttempt attempt) {
+    for (SchedContainerChangeRequest request : decreaseRequests) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing decrease request:" + request);
+      }
+      
+      boolean hasIncreaseRequest =
+          attempt.removeIncreaseRequest(request.getNodeId(),
+              request.getPriority(), request.getContainerId());
+      
+      if (hasIncreaseRequest) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("While processing decrease request, found a increase request "
+              + "for the same container "
+              + request.getContainerId()
+              + ", removed the increase request");
+        }
+      }
+      
+      // handle decrease request
+      decreaseContainer(request, attempt);
+    }
+  }
+
+  protected abstract void decreaseContainer(
+      SchedContainerChangeRequest decreaseRequest,
+      SchedulerApplicationAttempt attempt);
 
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
@@ -733,4 +825,56 @@ public abstract class AbstractYarnScheduler
     LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = "
         + maxClusterLevelAppPriority);
   }
+  
+  /**
+   * Normalize container increase/decrease request, and return
+   * SchedulerContainerResourceChangeRequest according to given
+   * ContainerResourceChangeRequest.
+   * 
+   * <pre>
+   * - Returns non-null value means validation succeeded
+   * - Throw exception when any other error happens
+   * </pre>
+   */
+  private SchedContainerChangeRequest
+      checkAndNormalizeContainerChangeRequest(
+          ContainerResourceChangeRequest request, boolean increase)
+          throws YarnException {
+    // We have done a check in ApplicationMasterService, but RMContainer status
+    // / Node resource could change since AMS won't acquire lock of scheduler.
+    RMServerUtils.checkAndNormalizeContainerChangeRequest(rmContext, request,
+        increase);
+    ContainerId containerId = request.getContainerId();
+    RMContainer rmContainer = getRMContainer(containerId);
+    SchedulerNode schedulerNode =
+        getSchedulerNode(rmContainer.getAllocatedNode());
+    
+    return new SchedContainerChangeRequest(schedulerNode, rmContainer,
+        request.getCapability());
+  }
+
+  protected List<SchedContainerChangeRequest>
+      checkAndNormalizeContainerChangeRequests(
+          List<ContainerResourceChangeRequest> changeRequests,
+          boolean increase) {
+    if (null == changeRequests || changeRequests.isEmpty()) {
+      return Collections.EMPTY_LIST;
+    }
+    
+    List<SchedContainerChangeRequest> schedulerChangeRequests =
+        new ArrayList<SchedContainerChangeRequest>();
+    for (ContainerResourceChangeRequest r : changeRequests) {
+      SchedContainerChangeRequest sr = null;
+      try {
+        sr = checkAndNormalizeContainerChangeRequest(r, increase);
+      } catch (YarnException e) {
+        LOG.warn("Error happens when checking increase request, Ignoring.."
+            + " exception=", e);
+        continue;
+      }
+      schedulerChangeRequests.add(sr);
+    }
+
+    return schedulerChangeRequests;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
index 3f2d8af..af6caad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
@@ -34,6 +34,9 @@ public class Allocation {
   final Set<ContainerId> fungibleContainers;
   final List<ResourceRequest> fungibleResources;
   final List<NMToken> nmTokens;
+  final List<Container> increasedContainers;
+  final List<Container> decreasedContainers;
+
 
   public Allocation(List<Container> containers, Resource resourceLimit,
       Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
@@ -45,12 +48,22 @@ public class Allocation {
   public Allocation(List<Container> containers, Resource resourceLimit,
       Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
       List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
+    this(containers,  resourceLimit,strictContainers,  fungibleContainers,
+      fungibleResources, nmTokens, null, null);
+  }
+  
+  public Allocation(List<Container> containers, Resource resourceLimit,
+      Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
+      List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
+      List<Container> increasedContainers, List<Container> decreasedContainer) {
     this.containers = containers;
     this.resourceLimit = resourceLimit;
     this.strictContainers = strictContainers;
     this.fungibleContainers = fungibleContainers;
     this.fungibleResources = fungibleResources;
     this.nmTokens = nmTokens;
+    this.increasedContainers = increasedContainers;
+    this.decreasedContainers = decreasedContainer;
   }
 
   public List<Container> getContainers() {
@@ -76,5 +89,12 @@ public class Allocation {
   public List<NMToken> getNMTokens() {
     return nmTokens;
   }
-
+  
+  public List<Container> getIncreasedContainers() {
+    return increasedContainers;
+  }
+  
+  public List<Container> getDecreasedContainers() {
+    return decreasedContainers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index e318d47..7623da0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +37,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -63,8 +67,11 @@ public class AppSchedulingInfo {
 
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
-  final Map<Priority, Map<String, ResourceRequest>> requests =
-    new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
+  final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
+      new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
+  final Map<NodeId, Map<Priority, Map<ContainerId, 
+      SchedContainerChangeRequest>>> increaseRequestMap =
+      new ConcurrentHashMap<>();
   private Set<String> userBlacklist = new HashSet<>();
   private Set<String> amBlacklist = new HashSet<>();
 
@@ -114,13 +121,177 @@ public class AppSchedulingInfo {
    */
   private synchronized void clearRequests() {
     priorities.clear();
-    requests.clear();
+    resourceRequestMap.clear();
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
   public long getNewContainerId() {
     return this.containerIdCounter.incrementAndGet();
   }
+  
+  public boolean hasIncreaseRequest(NodeId nodeId) {
+    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+        increaseRequestMap.get(nodeId);
+    if (null == requestsOnNode) {
+      return false;
+    }
+    return requestsOnNode.size() > 0;
+  }
+  
+  public Map<ContainerId, SchedContainerChangeRequest>
+      getIncreaseRequests(NodeId nodeId, Priority priority) {
+    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+        increaseRequestMap.get(nodeId);
+    if (null == requestsOnNode) {
+      return null;
+    }
+
+    return requestsOnNode.get(priority);
+  }
+
+  public synchronized boolean updateIncreaseRequests(
+      List<SchedContainerChangeRequest> increaseRequests) {
+    boolean resourceUpdated = false;
+
+    for (SchedContainerChangeRequest r : increaseRequests) {
+      NodeId nodeId = r.getRMContainer().getAllocatedNode();
+
+      Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+          increaseRequestMap.get(nodeId);
+      if (null == requestsOnNode) {
+        requestsOnNode = new TreeMap<>();
+        increaseRequestMap.put(nodeId, requestsOnNode);
+      }
+
+      SchedContainerChangeRequest prevChangeRequest =
+          getIncreaseRequest(nodeId, r.getPriority(), r.getContainerId());
+      if (null != prevChangeRequest) {
+        if (Resources.equals(prevChangeRequest.getTargetCapacity(),
+            r.getTargetCapacity())) {
+          // New target capacity is as same as what we have, just ignore the new
+          // one
+          continue;
+        }
+
+        // remove the old one
+        removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(),
+            prevChangeRequest.getContainerId());
+      }
+
+      if (Resources.equals(r.getTargetCapacity(), r.getRMContainer().getAllocatedResource())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Trying to increase/decrease container, "
+              + "target capacity = previous capacity = " + prevChangeRequest
+              + " for container=" + r.getContainerId()
+              + ". Will ignore this increase request");
+        }
+        continue;
+      }
+
+      // add the new one
+      resourceUpdated = true;
+      insertIncreaseRequest(r);
+    }
+    return resourceUpdated;
+  }
+
+  // insert increase request and add missing hierarchy if missing
+  private void insertIncreaseRequest(SchedContainerChangeRequest request) {
+    NodeId nodeId = request.getNodeId();
+    Priority priority = request.getPriority();
+    ContainerId containerId = request.getContainerId();
+    
+    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+        increaseRequestMap.get(nodeId);
+    if (null == requestsOnNode) {
+      requestsOnNode =
+          new HashMap<Priority, Map<ContainerId, SchedContainerChangeRequest>>();
+      increaseRequestMap.put(nodeId, requestsOnNode);
+    }
+
+    Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+        requestsOnNode.get(priority);
+    if (null == requestsOnNodeWithPriority) {
+      requestsOnNodeWithPriority =
+          new TreeMap<ContainerId, SchedContainerChangeRequest>();
+      requestsOnNode.put(priority, requestsOnNodeWithPriority);
+    }
+
+    requestsOnNodeWithPriority.put(containerId, request);
+
+    // update resources
+    String partition = request.getRMContainer().getNodeLabelExpression();
+    Resource delta = request.getDeltaCapacity();
+    appResourceUsage.incPending(partition, delta);
+    queue.incPendingResource(partition, delta);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added increase request:" + request.getContainerId()
+          + " delta=" + request.getDeltaCapacity());
+    }
+    
+    // update priorities
+    priorities.add(priority);
+  }
+  
+  public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority,
+      ContainerId containerId) {
+    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+        increaseRequestMap.get(nodeId);
+    if (null == requestsOnNode) {
+      return false;
+    }
+
+    Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+        requestsOnNode.get(priority);
+    if (null == requestsOnNodeWithPriority) {
+      return false;
+    }
+
+    SchedContainerChangeRequest request =
+        requestsOnNodeWithPriority.remove(containerId);
+    
+    // remove hierarchies if it becomes empty
+    if (requestsOnNodeWithPriority.isEmpty()) {
+      requestsOnNode.remove(priority);
+    }
+    if (requestsOnNode.isEmpty()) {
+      increaseRequestMap.remove(nodeId);
+    }
+    
+    if (request == null) {
+      return false;
+    }
+
+    // update queue's pending resource if request exists
+    String partition = request.getRMContainer().getNodeLabelExpression();
+    Resource delta = request.getDeltaCapacity();
+    appResourceUsage.decPending(partition, delta);
+    queue.decPendingResource(partition, delta);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("remove increase request:" + request);
+    }
+    
+    return true;
+  }
+  
+  public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
+      Priority priority, ContainerId containerId) {
+    Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+        increaseRequestMap.get(nodeId);
+    if (null == requestsOnNode) {
+      return null;
+    }
+
+    Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+        requestsOnNode.get(priority);
+    if (null == requestsOnNodeWithPriority) {
+      return null;
+    }
+
+    return requestsOnNodeWithPriority.get(containerId);
+  }
 
   /**
    * The ApplicationMaster is updating resource requirements for the
@@ -163,11 +334,11 @@ public class AppSchedulingInfo {
         }
       }
 
-      Map<String, ResourceRequest> asks = this.requests.get(priority);
+      Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority);
 
       if (asks == null) {
         asks = new ConcurrentHashMap<String, ResourceRequest>();
-        this.requests.put(priority, asks);
+        this.resourceRequestMap.put(priority, asks);
         this.priorities.add(priority);
       }
       lastRequest = asks.get(resourceName);
@@ -260,12 +431,12 @@ public class AppSchedulingInfo {
 
   synchronized public Map<String, ResourceRequest> getResourceRequests(
       Priority priority) {
-    return requests.get(priority);
+    return resourceRequestMap.get(priority);
   }
 
   public List<ResourceRequest> getAllResourceRequests() {
     List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
-    for (Map<String, ResourceRequest> r : requests.values()) {
+    for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
       ret.addAll(r.values());
     }
     return ret;
@@ -273,7 +444,7 @@ public class AppSchedulingInfo {
 
   synchronized public ResourceRequest getResourceRequest(Priority priority,
       String resourceName) {
-    Map<String, ResourceRequest> nodeRequests = requests.get(priority);
+    Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority);
     return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
   }
 
@@ -301,6 +472,50 @@ public class AppSchedulingInfo {
     }
   }
   
+  public synchronized void increaseContainer(
+      SchedContainerChangeRequest increaseRequest) {
+    NodeId nodeId = increaseRequest.getNodeId();
+    Priority priority = increaseRequest.getPriority();
+    ContainerId containerId = increaseRequest.getContainerId();
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("allocated increase request : applicationId=" + applicationId
+          + " container=" + containerId + " host="
+          + increaseRequest.getNodeId() + " user=" + user + " resource="
+          + increaseRequest.getDeltaCapacity());
+    }
+    
+    // Set queue metrics
+    queue.getMetrics().allocateResources(user, 0,
+        increaseRequest.getDeltaCapacity(), true);
+    
+    // remove the increase request from pending increase request map
+    removeIncreaseRequest(nodeId, priority, containerId);
+    
+    // update usage
+    appResourceUsage.incUsed(increaseRequest.getNodePartition(),
+        increaseRequest.getDeltaCapacity());
+  }
+  
+  public synchronized void decreaseContainer(
+      SchedContainerChangeRequest decreaseRequest) {
+    // Delta is negative when it's a decrease request
+    Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Decrease container : applicationId=" + applicationId
+          + " container=" + decreaseRequest.getContainerId() + " host="
+          + decreaseRequest.getNodeId() + " user=" + user + " resource="
+          + absDelta);
+    }
+    
+    // Set queue metrics
+    queue.getMetrics().releaseResources(user, 0, absDelta);
+
+    // update usage
+    appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
+  }
+  
   /**
    * Resources have been allocated to this application by the resource
    * scheduler. Track them.
@@ -359,11 +574,11 @@ public class AppSchedulingInfo {
     // Update future requirements
     decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
 
-    ResourceRequest rackLocalRequest = requests.get(priority).get(
+    ResourceRequest rackLocalRequest = resourceRequestMap.get(priority).get(
         node.getRackName());
     decResourceRequest(node.getRackName(), priority, rackLocalRequest);
 
-    ResourceRequest offRackRequest = requests.get(priority).get(
+    ResourceRequest offRackRequest = resourceRequestMap.get(priority).get(
         ResourceRequest.ANY);
     decrementOutstanding(offRackRequest);
 
@@ -377,7 +592,7 @@ public class AppSchedulingInfo {
       ResourceRequest request) {
     request.setNumContainers(request.getNumContainers() - 1);
     if (request.getNumContainers() == 0) {
-      requests.get(priority).remove(resourceName);
+      resourceRequestMap.get(priority).remove(resourceName);
     }
   }
 
@@ -394,7 +609,7 @@ public class AppSchedulingInfo {
     // Update future requirements
     decResourceRequest(node.getRackName(), priority, rackLocalRequest);
     
-    ResourceRequest offRackRequest = requests.get(priority).get(
+    ResourceRequest offRackRequest = resourceRequestMap.get(priority).get(
         ResourceRequest.ANY);
     decrementOutstanding(offRackRequest);
 
@@ -449,6 +664,12 @@ public class AppSchedulingInfo {
         }
       }
     }
+    
+    // also we need to check increase request
+    if (!deactivate) {
+      deactivate = increaseRequestMap.isEmpty();
+    }
+
     if (deactivate) {
       activeUsersManager.deactivateApplication(user, applicationId);
     }
@@ -457,7 +678,7 @@ public class AppSchedulingInfo {
   synchronized public void move(Queue newQueue) {
     QueueMetrics oldMetrics = queue.getMetrics();
     QueueMetrics newMetrics = newQueue.getMetrics();
-    for (Map<String, ResourceRequest> asks : requests.values()) {
+    for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
       ResourceRequest request = asks.get(ResourceRequest.ANY);
       if (request != null) {
         oldMetrics.decrPendingResources(user, request.getNumContainers(),
@@ -484,7 +705,7 @@ public class AppSchedulingInfo {
   synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
     // clear pending resources metrics for the application
     QueueMetrics metrics = queue.getMetrics();
-    for (Map<String, ResourceRequest> asks : requests.values()) {
+    for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
       ResourceRequest request = asks.get(ResourceRequest.ANY);
       if (request != null) {
         metrics.decrPendingResources(user, request.getNumContainers(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 09fd73e..d94b621 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -373,17 +373,20 @@ public class QueueMetrics implements MetricsSource {
   }
 
   private void _decrPendingResources(int containers, Resource res) {
+    // if #container = 0, means change container resource
     pendingContainers.decr(containers);
-    pendingMB.decr(res.getMemory() * containers);
-    pendingVCores.decr(res.getVirtualCores() * containers);
+    pendingMB.decr(res.getMemory() * Math.max(containers, 1));
+    pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
   }
 
   public void allocateResources(String user, int containers, Resource res,
       boolean decrPending) {
+    // if #containers = 0, means change container resource
     allocatedContainers.incr(containers);
     aggregateContainersAllocated.incr(containers);
-    allocatedMB.incr(res.getMemory() * containers);
-    allocatedVCores.incr(res.getVirtualCores() * containers);
+
+    allocatedMB.incr(res.getMemory() * Math.max(containers, 1));
+    allocatedVCores.incr(res.getVirtualCores() * Math.max(containers, 1));
     if (decrPending) {
       _decrPendingResources(containers, res);
     }
@@ -397,10 +400,11 @@ public class QueueMetrics implements MetricsSource {
   }
 
   public void releaseResources(String user, int containers, Resource res) {
+    // if #container = 0, means change container resource.
     allocatedContainers.decr(containers);
     aggregateContainersReleased.incr(containers);
-    allocatedMB.decr(res.getMemory() * containers);
-    allocatedVCores.decr(res.getVirtualCores() * containers);
+    allocatedMB.decr(res.getMemory() * Math.max(containers, 1));
+    allocatedVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
       userMetrics.releaseResources(user, containers, res);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
new file mode 100644
index 0000000..ea109fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This is ContainerResourceChangeRequest in scheduler side, it contains some
+ * pointers to runtime objects like RMContainer, SchedulerNode, etc. This will
+ * be easier for scheduler making decision.
+ */
+public class SchedContainerChangeRequest implements
+    Comparable<SchedContainerChangeRequest> {
+  RMContainer rmContainer;
+  Resource targetCapacity;
+  SchedulerNode schedulerNode;
+  Resource deltaCapacity;
+
+  public SchedContainerChangeRequest(SchedulerNode schedulerNode,
+      RMContainer rmContainer, Resource targetCapacity) {
+    this.rmContainer = rmContainer;
+    this.targetCapacity = targetCapacity;
+    this.schedulerNode = schedulerNode;
+    deltaCapacity = Resources.subtract(targetCapacity,
+        rmContainer.getAllocatedResource());
+  }
+  
+  public NodeId getNodeId() {
+    return this.rmContainer.getAllocatedNode();
+  }
+
+  public RMContainer getRMContainer() {
+    return this.rmContainer;
+  }
+
+  public Resource getTargetCapacity() {
+    return this.targetCapacity;
+  }
+
+  /**
+   * Delta capacity = before - target, so if it is a decrease request, delta
+   * capacity will be negative
+   */
+  public Resource getDeltaCapacity() {
+    return deltaCapacity;
+  }
+  
+  public Priority getPriority() {
+    return rmContainer.getContainer().getPriority();
+  }
+  
+  public ContainerId getContainerId() {
+    return rmContainer.getContainerId();
+  }
+  
+  public String getNodePartition() {
+    return schedulerNode.getPartition();
+  }
+  
+  public SchedulerNode getSchedulerNode() {
+    return schedulerNode;
+  }
+  
+  @Override
+  public int hashCode() {
+    return (getContainerId().hashCode() << 16) + targetCapacity.hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof SchedContainerChangeRequest)) {
+      return false;
+    }
+    return compareTo((SchedContainerChangeRequest)other) == 0;
+  }
+
+  @Override
+  public int compareTo(SchedContainerChangeRequest other) {
+    if (other == null) {
+      return -1;
+    }
+    
+    int rc = getPriority().compareTo(other.getPriority());
+    if (0 != rc) {
+      return rc;
+    }
+    
+    return getContainerId().compareTo(other.getContainerId());
+  }
+  
+  @Override
+  public String toString() {
+    return "<container=" + getContainerId() + ", targetCapacity="
+        + targetCapacity + ", delta=" + deltaCapacity + ", node="
+        + getNodeId().toString() + ">";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
index 519de98..96288f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
@@ -28,7 +28,7 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
 
   private Queue queue;
   private final String user;
-  private T currentAttempt;
+  private volatile T currentAttempt;
   private volatile Priority priority;
 
   public SchedulerApplication(Queue queue, String user) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index b361d15..f064e97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -51,16 +53,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -104,8 +109,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
   private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
 
-  protected List<RMContainer> newlyAllocatedContainers = 
-      new ArrayList<RMContainer>();
+  protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
+  protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
+  protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
+  protected Set<NMToken> updatedNMTokens = new HashSet<>();
 
   // This pendingRelease is used in work-preserving recovery scenario to keep
   // track of the AM's outstanding release requests. RM on recovery could
@@ -219,7 +226,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return appSchedulingInfo.getPriorities();
   }
   
-  public synchronized ResourceRequest getResourceRequest(Priority priority, String resourceName) {
+  public synchronized ResourceRequest getResourceRequest(Priority priority,
+      String resourceName) {
     return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
   }
 
@@ -324,24 +332,28 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return reservedContainers;
   }
   
-  public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
-      RMContainer rmContainer, Container container) {
-    // Create RMContainer if necessary
-    if (rmContainer == null) {
-      rmContainer = 
-          new RMContainerImpl(container, getApplicationAttemptId(), 
-              node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
+  public synchronized boolean reserveIncreasedContainer(SchedulerNode node,
+      Priority priority, RMContainer rmContainer, Resource reservedResource) {
+    if (commonReserve(node, priority, rmContainer, reservedResource)) {
       attemptResourceUsage.incReserved(node.getPartition(),
-          container.getResource());
-      
-      // Reset the re-reservation count
-      resetReReservations(priority);
-    } else {
-      // Note down the re-reservation
-      addReReservation(priority);
+          reservedResource);
+      // succeeded
+      return true;
+    }
+    
+    return false;
+  }
+  
+  private synchronized boolean commonReserve(SchedulerNode node,
+      Priority priority, RMContainer rmContainer, Resource reservedResource) {
+    try {
+      rmContainer.handle(new RMContainerReservedEvent(rmContainer
+          .getContainerId(), reservedResource, node.getNodeID(), priority));
+    } catch (InvalidStateTransitionException e) {
+      // We reach here could be caused by container already finished, return
+      // false indicate it fails
+      return false;
     }
-    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
-        container.getResource(), node.getNodeID(), priority));
     
     Map<NodeId, RMContainer> reservedContainers = 
         this.reservedContainers.get(priority);
@@ -356,8 +368,30 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
           + " reserved container " + rmContainer + " on node " + node
           + ". This attempt currently has " + reservedContainers.size()
           + " reserved containers at priority " + priority
-          + "; currentReservation " + container.getResource());
+          + "; currentReservation " + reservedResource);
     }
+    
+    return true;
+  }
+  
+  public synchronized RMContainer reserve(SchedulerNode node,
+      Priority priority, RMContainer rmContainer, Container container) {
+    // Create RMContainer if necessary
+    if (rmContainer == null) {
+      rmContainer =
+          new RMContainerImpl(container, getApplicationAttemptId(),
+              node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
+      attemptResourceUsage.incReserved(node.getPartition(),
+          container.getResource());
+
+      // Reset the re-reservation count
+      resetReReservations(priority);
+    } else {
+      // Note down the re-reservation
+      addReReservation(priority);
+    }
+    
+    commonReserve(node, priority, rmContainer, container.getResource());
 
     return rmContainer;
   }
@@ -437,69 +471,100 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   public Resource getCurrentConsumption() {
     return attemptResourceUsage.getUsed();
   }
-
-  public static class ContainersAndNMTokensAllocation {
-    List<Container> containerList;
-    List<NMToken> nmTokenList;
-
-    public ContainersAndNMTokensAllocation(List<Container> containerList,
-        List<NMToken> nmTokenList) {
-      this.containerList = containerList;
-      this.nmTokenList = nmTokenList;
+  
+  private Container updateContainerAndNMToken(RMContainer rmContainer,
+      boolean newContainer, boolean increasedContainer) {
+    Container container = rmContainer.getContainer();
+    ContainerType containerType = ContainerType.TASK;
+    // The working knowledge is that masterContainer for AM is null as it
+    // itself is the master container.
+    RMAppAttempt appAttempt = rmContext.getRMApps()
+        .get(container.getId().getApplicationAttemptId().getApplicationId())
+        .getCurrentAppAttempt();
+    if (isWaitingForAMContainer(getApplicationId())) {
+      containerType = ContainerType.APPLICATION_MASTER;
     }
-
-    public List<Container> getContainerList() {
-      return containerList;
+    try {
+      // create container token and NMToken altogether.
+      container.setContainerToken(rmContext.getContainerTokenSecretManager()
+          .createContainerToken(container.getId(), container.getNodeId(),
+              getUser(), container.getResource(), container.getPriority(),
+              rmContainer.getCreationTime(), this.logAggregationContext,
+              rmContainer.getNodeLabelExpression(), containerType));
+      NMToken nmToken =
+          rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
+              getApplicationAttemptId(), container);
+      if (nmToken != null) {
+        updatedNMTokens.add(nmToken);
+      }
+    } catch (IllegalArgumentException e) {
+      // DNS might be down, skip returning this container.
+      LOG.error("Error trying to assign container token and NM token to"
+          + " an updated container " + container.getId(), e);
+      return null;
     }
-
-    public List<NMToken> getNMTokenList() {
-      return nmTokenList;
+    
+    if (newContainer) {
+      rmContainer.handle(new RMContainerEvent(
+          rmContainer.getContainerId(), RMContainerEventType.ACQUIRED));
+    } else {
+      rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
+          rmContainer.getContainerId(), increasedContainer));
     }
+    return container;
   }
 
-  // Create container token and NMToken altogether, if either of them fails for
+  // Create container token and update NMToken altogether, if either of them fails for
   // some reason like DNS unavailable, do not return this container and keep it
   // in the newlyAllocatedContainers waiting to be refetched.
-  public synchronized ContainersAndNMTokensAllocation
-      pullNewlyAllocatedContainersAndNMTokens() {
+  public synchronized List<Container> pullNewlyAllocatedContainers() {
     List<Container> returnContainerList =
         new ArrayList<Container>(newlyAllocatedContainers.size());
-    List<NMToken> nmTokens = new ArrayList<NMToken>();
     for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
-      .hasNext();) {
+        .hasNext();) {
       RMContainer rmContainer = i.next();
-      Container container = rmContainer.getContainer();
-      ContainerType containerType = ContainerType.TASK;
-      boolean isWaitingForAMContainer = isWaitingForAMContainer(
-          container.getId().getApplicationAttemptId().getApplicationId());
-      if (isWaitingForAMContainer) {
-        containerType = ContainerType.APPLICATION_MASTER;
+      Container updatedContainer =
+          updateContainerAndNMToken(rmContainer, true, false);
+      // Only add container to return list when it's not null. updatedContainer
+      // could be null when generate token failed, it can be caused by DNS
+      // resolving failed.
+      if (updatedContainer != null) {
+        returnContainerList.add(updatedContainer);
+        i.remove();
       }
-      try {
-        // create container token and NMToken altogether.
-        container.setContainerToken(rmContext.getContainerTokenSecretManager()
-            .createContainerToken(container.getId(), container.getNodeId(),
-                getUser(), container.getResource(), container.getPriority(),
-                rmContainer.getCreationTime(), this.logAggregationContext,
-                rmContainer.getNodeLabelExpression(), containerType));
-        NMToken nmToken =
-            rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
-              getApplicationAttemptId(), container);
-        if (nmToken != null) {
-          nmTokens.add(nmToken);
-        }
-      } catch (IllegalArgumentException e) {
-        // DNS might be down, skip returning this container.
-        LOG.error("Error trying to assign container token and NM token to" +
-            " an allocated container " + container.getId(), e);
-        continue;
+    }
+    return returnContainerList;
+  }
+  
+  private synchronized List<Container> pullNewlyUpdatedContainers(
+      Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
+    List<Container> returnContainerList =
+        new ArrayList<Container>(updatedContainerMap.size());
+    for (Iterator<Entry<ContainerId, RMContainer>> i =
+        updatedContainerMap.entrySet().iterator(); i.hasNext();) {
+      RMContainer rmContainer = i.next().getValue();
+      Container updatedContainer =
+          updateContainerAndNMToken(rmContainer, false, increase);
+      if (updatedContainer != null) {
+        returnContainerList.add(updatedContainer);
+        i.remove();
       }
-      returnContainerList.add(container);
-      i.remove();
-      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
-        RMContainerEventType.ACQUIRED));
     }
-    return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
+    return returnContainerList;
+  }
+
+  public synchronized List<Container> pullNewlyIncreasedContainers() {
+    return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
+  }
+  
+  public synchronized List<Container> pullNewlyDecreasedContainers() {
+    return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
+  }
+  
+  public synchronized List<NMToken> pullUpdatedNMTokens() {
+    List<NMToken> returnList = new ArrayList<NMToken>(updatedNMTokens);
+    updatedNMTokens.clear();
+    return returnList;
   }
 
   public boolean isWaitingForAMContainer(ApplicationId applicationId) {
@@ -770,4 +835,50 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return attemptResourceUsage;
   }
   
-}
+  public synchronized boolean removeIncreaseRequest(NodeId nodeId,
+      Priority priority, ContainerId containerId) {
+    return appSchedulingInfo.removeIncreaseRequest(nodeId, priority,
+        containerId);
+  }
+  
+  public synchronized boolean updateIncreaseRequests(
+      List<SchedContainerChangeRequest> increaseRequests) {
+    return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
+  }
+  
+  private synchronized void changeContainerResource(
+      SchedContainerChangeRequest changeRequest, boolean increase) {
+    if (increase) {
+      appSchedulingInfo.increaseContainer(changeRequest);
+    } else {
+      appSchedulingInfo.decreaseContainer(changeRequest);
+    }
+
+    RMContainer changedRMContainer = changeRequest.getRMContainer(); 
+    changedRMContainer.handle(
+        new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
+            changeRequest.getTargetCapacity(), increase));
+
+    // remove pending and not pulled by AM newly-increased/decreased-containers
+    // and add the new one
+    if (increase) {
+      newlyDecreasedContainers.remove(changeRequest.getContainerId());
+      newlyIncreasedContainers.put(changeRequest.getContainerId(),
+          changedRMContainer);
+    } else {
+      newlyIncreasedContainers.remove(changeRequest.getContainerId());
+      newlyDecreasedContainers.put(changeRequest.getContainerId(),
+          changedRMContainer);
+    }
+  }
+  
+  public synchronized void decreaseContainer(
+      SchedContainerChangeRequest decreaseRequest) {
+    changeContainerResource(decreaseRequest, false);
+  }
+  
+  public synchronized void increaseContainer(
+      SchedContainerChangeRequest increaseRequest) {
+    changeContainerResource(increaseRequest, true);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index f03663a..f3d3906 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -157,6 +157,37 @@ public abstract class SchedulerNode {
         + getUsedResource() + " used and " + getAvailableResource()
         + " available after allocation");
   }
+  
+  private synchronized void changeContainerResource(ContainerId containerId,
+      Resource deltaResource, boolean increase) {
+    if (increase) {
+      deductAvailableResource(deltaResource);
+    } else {
+      addAvailableResource(deltaResource);
+    }
+
+    LOG.info((increase ? "Increased" : "Decreased") + " container "
+        + containerId + " of capacity " + deltaResource + " on host "
+        + rmNode.getNodeAddress() + ", which has " + numContainers
+        + " containers, " + getUsedResource() + " used and "
+        + getAvailableResource() + " available after allocation");
+  }
+  
+  /**
+   * The Scheduler increased container
+   */
+  public synchronized void increaseContainer(ContainerId containerId,
+      Resource deltaResource) {
+    changeContainerResource(containerId, deltaResource, true);
+  }
+  
+  /**
+   * The Scheduler decreased container
+   */
+  public synchronized void decreaseContainer(ContainerId containerId,
+      Resource deltaResource) {
+    changeContainerResource(containerId, deltaResource, false);
+  }
 
   /**
    * Get available resources on the node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 8047d0b..abefee8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -361,7 +361,7 @@ public class SchedulerUtils {
   }
   
   public static boolean checkResourceRequestMatchingNodePartition(
-      ResourceRequest offswitchResourceRequest, String nodePartition,
+      String requestedPartition, String nodePartition,
       SchedulingMode schedulingMode) {
     // We will only look at node label = nodeLabelToLookAt according to
     // schedulingMode and partition of node.
@@ -371,12 +371,11 @@ public class SchedulerUtils {
     } else {
       nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
     }
-    
-    String askedNodePartition = offswitchResourceRequest.getNodeLabelExpression();
-    if (null == askedNodePartition) {
-      askedNodePartition = RMNodeLabelsManager.NO_LABEL;
+
+    if (null == requestedPartition) {
+      requestedPartition = RMNodeLabelsManager.NO_LABEL;
     }
-    return askedNodePartition.equals(nodePartitionToLookAt);
+    return requestedPartition.equals(nodePartitionToLookAt);
   }
   
   private static boolean hasPendingResourceRequest(ResourceCalculator rc,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index f2753e6..099db69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -133,16 +134,17 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
    * @param release
    * @param blacklistAdditions 
    * @param blacklistRemovals 
+   * @param increaseRequests
+   * @param decreaseRequests
    * @return the {@link Allocation} for the application
    */
   @Public
   @Stable
-  Allocation 
-  allocate(ApplicationAttemptId appAttemptId, 
-      List<ResourceRequest> ask,
-      List<ContainerId> release, 
-      List<String> blacklistAdditions, 
-      List<String> blacklistRemovals);
+  Allocation allocate(ApplicationAttemptId appAttemptId,
+      List<ResourceRequest> ask, List<ContainerId> release,
+      List<String> blacklistAdditions, List<String> blacklistRemovals,
+      List<ContainerResourceChangeRequest> increaseRequests,
+      List<ContainerResourceChangeRequest> decreaseRequests);
 
   /**
    * Get node resource usage report.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 0ae4d1a..9f61b11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -43,10 +43,10 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -76,7 +76,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   private boolean preemptionDisabled;
 
   // Track resource usage-by-label like used-resource/pending-resource, etc.
-  ResourceUsage queueUsage;
+  volatile ResourceUsage queueUsage;
   
   // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
   // etc.
@@ -340,22 +340,27 @@ public abstract class AbstractCSQueue implements CSQueue {
     return minimumAllocation;
   }
   
-  synchronized void allocateResource(Resource clusterResource, 
-      Resource resource, String nodePartition) {
+  synchronized void allocateResource(Resource clusterResource,
+      Resource resource, String nodePartition, boolean changeContainerResource) {
     queueUsage.incUsed(nodePartition, resource);
 
-    ++numContainers;
+    if (!changeContainerResource) {
+      ++numContainers;
+    }
     CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
         minimumAllocation, this, labelManager, nodePartition);
   }
   
   protected synchronized void releaseResource(Resource clusterResource,
-      Resource resource, String nodePartition) {
+      Resource resource, String nodePartition, boolean changeContainerResource) {
     queueUsage.decUsed(nodePartition, resource);
 
     CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
         minimumAllocation, this, labelManager, nodePartition);
-    --numContainers;
+
+    if (!changeContainerResource) {
+      --numContainers;
+    }
   }
   
   @Private
@@ -446,8 +451,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved,
-      SchedulingMode schedulingMode) {
+      String nodePartition, ResourceLimits currentResourceLimits,
+      Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
     // Get current limited resource: 
     // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
     // queues' max capacity.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
index 928437f..68f6f12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
@@ -41,6 +41,7 @@ public class CSAssignment {
   private final boolean skipped;
   private boolean fulfilledReservation;
   private final AssignmentInformation assignmentInformation;
+  private boolean increaseAllocation;
 
   public CSAssignment(Resource resource, NodeType type) {
     this(resource, type, null, null, false, false);
@@ -138,4 +139,12 @@ public class CSAssignment {
   public AssignmentInformation getAssignmentInformation() {
     return this.assignmentInformation;
   }
+  
+  public boolean isIncreasedAllocation() {
+    return increaseAllocation;
+  }
+
+  public void setIncreasedAllocation(boolean flag) {
+    increaseAllocation = flag;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 9855dd4..e90deeb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
@@ -219,6 +220,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
       boolean sortQueues);
 
   /**
+   * We have a reserved increased container in the queue, we need to unreserve
+   * it. Since we just want to cancel the reserved increase request instead of
+   * stop the container, we shouldn't call completedContainer for such purpose.
+   */
+  public void unreserveIncreasedContainer(Resource clusterResource,
+      FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer);
+
+  /**
    * Get the number of applications in the queue.
    * @return number of applications
    */
@@ -313,4 +322,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    *          new resource asked
    */
   public void decPendingResource(String nodeLabel, Resource resourceToDec);
+  
+  /**
+   * Decrease container resource in the queue
+   */
+  public void decreaseContainer(Resource clusterResource,
+      SchedContainerChangeRequest decreaseRequest,
+      FiCaSchedulerApp app);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3441537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index dbaccaf..cf7f6c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -84,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -95,6 +98,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -884,10 +889,14 @@ public class CapacityScheduler extends
   }
 
   @Override
+  // Note: when AM asks to decrease container or release container, we will
+  // acquire scheduler lock
   @Lock(Lock.NoLock.class)
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
-      List<ResourceRequest> ask, List<ContainerId> release, 
-      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+      List<ResourceRequest> ask, List<ContainerId> release,
+      List<String> blacklistAdditions, List<String> blacklistRemovals,
+      List<ContainerResourceChangeRequest> increaseRequests,
+      List<ContainerResourceChangeRequest> decreaseRequests) {
 
     FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
@@ -898,6 +907,14 @@ public class CapacityScheduler extends
     SchedulerUtils.normalizeRequests(
         ask, getResourceCalculator(), getClusterResource(),
         getMinimumResourceCapability(), getMaximumResourceCapability());
+    
+    // Pre-process increase requests
+    List<SchedContainerChangeRequest> normalizedIncreaseRequests =
+        checkAndNormalizeContainerChangeRequests(increaseRequests, true);
+    
+    // Pre-process decrease requests
+    List<SchedContainerChangeRequest> normalizedDecreaseRequests =
+        checkAndNormalizeContainerChangeRequests(decreaseRequests, false);
 
     // Release containers
     releaseContainers(release, application);
@@ -914,8 +931,8 @@ public class CapacityScheduler extends
         return EMPTY_ALLOCATION;
       }
 
+      // Process resource requests
       if (!ask.isEmpty()) {
-
         if(LOG.isDebugEnabled()) {
           LOG.debug("allocate: pre-update " + applicationAttemptId +
               " ask size =" + ask.size());
@@ -932,6 +949,12 @@ public class CapacityScheduler extends
           application.showRequests();
         }
       }
+      
+      // Process increase resource requests
+      if (application.updateIncreaseRequests(normalizedIncreaseRequests)
+          && (updateDemandForQueue == null)) {
+        updateDemandForQueue = (LeafQueue) application.getQueue();
+      }
 
       if (application.isWaitingForAMContainer(application.getApplicationId())) {
         // Allocate is for AM and update AM blacklist for this
@@ -940,6 +963,9 @@ public class CapacityScheduler extends
       } else {
         application.updateBlacklist(blacklistAdditions, blacklistRemovals);
       }
+      
+      // Decrease containers
+      decreaseContainers(normalizedDecreaseRequests, application);
 
       allocation = application.getAllocation(getResourceCalculator(),
                    clusterResource, getMinimumResourceCapability());
@@ -1001,6 +1027,13 @@ public class CapacityScheduler extends
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
       containerLaunchedOnNode(launchedContainer.getContainerId(), node);
     }
+    
+    // Processing the newly increased containers
+    List<Container> newlyIncreasedContainers =
+        nm.pullNewlyIncreasedContainers();
+    for (Container container : newlyIncreasedContainers) {
+      containerIncreasedOnNode(container.getId(), node, container);
+    }
 
     // Process completed containers
     int releasedContainers = 0;
@@ -1486,6 +1519,50 @@ public class CapacityScheduler extends
         container.getId(), queue.getQueuePath());
     }
   }
+  
+  @Lock(CapacityScheduler.class)
+  @Override
+  protected synchronized void decreaseContainer(
+      SchedContainerChangeRequest decreaseRequest,
+      SchedulerApplicationAttempt attempt) {
+    RMContainer rmContainer = decreaseRequest.getRMContainer();
+
+    // Check container status before doing decrease
+    if (rmContainer.getState() != RMContainerState.RUNNING) {
+      LOG.info("Trying to decrease a container not in RUNNING state, container="
+          + rmContainer + " state=" + rmContainer.getState().name());
+      return;
+    }
+    
+    // Delta capacity of this decrease request is 0, this decrease request may
+    // just to cancel increase request
+    if (Resources.equals(decreaseRequest.getDeltaCapacity(), Resources.none())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Decrease target resource equals to existing resource for container:"
+            + decreaseRequest.getContainerId()
+            + " ignore this decrease request.");
+      }
+      return;
+    }
+
+    // Save resource before decrease
+    Resource resourceBeforeDecrease =
+        Resources.clone(rmContainer.getContainer().getResource());
+
+    FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
+    LeafQueue queue = (LeafQueue) attempt.getQueue();
+    queue.decreaseContainer(clusterResource, decreaseRequest, app);
+    
+    // Notify RMNode the container will be decreased
+    this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
+            Arrays.asList(rmContainer.getContainer())));
+    
+    LOG.info("Application attempt " + app.getApplicationAttemptId()
+        + " decreased container:" + decreaseRequest.getContainerId() + " from "
+        + resourceBeforeDecrease + " to "
+        + decreaseRequest.getTargetCapacity());
+  }
 
   @Lock(Lock.NoLock.class)
   @VisibleForTesting