You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/02/28 18:41:58 UTC

[1/2] hadoop git commit: YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 480b4dd57 -> eac6b4c35


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index b65f16a..1b20556 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1134,12 +1134,7 @@ public class LeafQueue extends AbstractCSQueue {
 
     if (targetLeafQueue == this) {
       // When trying to preempt containers from the same queue
-      if (rmContainer.hasIncreaseReservation()) {
-        // Increased container reservation
-        unreserveIncreasedContainer(clusterResource,
-            schedulerContainer.getSchedulerApplicationAttempt(),
-            schedulerContainer.getSchedulerNode(), rmContainer);
-      } else if (rmContainer.getState() == RMContainerState.RESERVED) {
+      if (rmContainer.getState() == RMContainerState.RESERVED) {
         // For other reserved containers
         // This is a reservation exchange, complete previous reserved container
         completedContainer(clusterResource,
@@ -1212,8 +1207,7 @@ public class LeafQueue extends AbstractCSQueue {
               schedulerContainer.getSchedulerApplicationAttempt(),
               allocation.getAllocatedOrReservedResource(),
               schedulerContainer.getNodePartition(),
-              schedulerContainer.getRmContainer(),
-              allocation.isIncreasedAllocation());
+              schedulerContainer.getRmContainer());
           orderingPolicy.containerAllocated(
               schedulerContainer.getSchedulerApplicationAttempt(),
               schedulerContainer.getRmContainer());
@@ -1446,40 +1440,6 @@ public class LeafQueue extends AbstractCSQueue {
       readLock.unlock();
     }
   }
-  
-  @Override
-  public void unreserveIncreasedContainer(Resource clusterResource,
-      FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
-    boolean removed = false;
-    Priority priority = null;
-
-    try {
-      writeLock.lock();
-      if (rmContainer.getContainer() != null) {
-        priority = rmContainer.getContainer().getPriority();
-      }
-
-      if (null != priority) {
-        removed = app.unreserve(rmContainer.getAllocatedSchedulerKey(), node,
-            rmContainer);
-      }
-
-      if (removed) {
-        // Inform the ordering policy
-        orderingPolicy.containerReleased(app, rmContainer);
-
-        releaseResource(clusterResource, app, rmContainer.getReservedResource(),
-            node.getPartition(), rmContainer, true);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-
-    if (removed) {
-      getParent().unreserveIncreasedContainer(clusterResource, app, node,
-          rmContainer);
-    }
-  }
 
   private void updateSchedulerHealthForCompletedContainer(
       RMContainer rmContainer, ContainerStatus containerStatus) {
@@ -1538,16 +1498,6 @@ public class LeafQueue extends AbstractCSQueue {
     updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
 
     if (application != null) {
-      // unreserve container increase request if it previously reserved.
-      if (rmContainer.hasIncreaseReservation()) {
-        unreserveIncreasedContainer(clusterResource, application, node,
-            rmContainer);
-      }
-      
-      // Remove container increase request if it exists
-      application.removeIncreaseRequest(node.getNodeID(),
-          rmContainer.getAllocatedSchedulerKey(), rmContainer.getContainerId());
-
       boolean removed = false;
 
       // Careful! Locking order is important!
@@ -1576,7 +1526,7 @@ public class LeafQueue extends AbstractCSQueue {
           orderingPolicy.containerReleased(application, rmContainer);
 
           releaseResource(clusterResource, application, container.getResource(),
-              node.getPartition(), rmContainer, false);
+              node.getPartition(), rmContainer);
         }
       } finally {
         writeLock.unlock();
@@ -1597,12 +1547,10 @@ public class LeafQueue extends AbstractCSQueue {
 
   void allocateResource(Resource clusterResource,
       SchedulerApplicationAttempt application, Resource resource,
-      String nodePartition, RMContainer rmContainer,
-      boolean isIncreasedAllocation) {
+      String nodePartition, RMContainer rmContainer) {
     try {
       writeLock.lock();
-      super.allocateResource(clusterResource, resource, nodePartition,
-          isIncreasedAllocation);
+      super.allocateResource(clusterResource, resource, nodePartition);
 
       // handle ignore exclusivity container
       if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1643,11 +1591,10 @@ public class LeafQueue extends AbstractCSQueue {
 
   void releaseResource(Resource clusterResource,
       FiCaSchedulerApp application, Resource resource, String nodePartition,
-      RMContainer rmContainer, boolean isChangeResource) {
+      RMContainer rmContainer) {
     try {
       writeLock.lock();
-      super.releaseResource(clusterResource, resource, nodePartition,
-          isChangeResource);
+      super.releaseResource(clusterResource, resource, nodePartition);
 
       // handle ignore exclusivity container
       if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1787,7 +1734,7 @@ public class LeafQueue extends AbstractCSQueue {
           rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, attempt,
           rmContainer.getContainer().getResource(), node.getPartition(),
-          rmContainer, false);
+          rmContainer);
     } finally {
       writeLock.unlock();
     }
@@ -1912,7 +1859,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getPartition(), rmContainer, false);
+          .getResource(), node.getPartition(), rmContainer);
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1931,7 +1878,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getPartition(), rmContainer, false);
+          .getResource(), node.getPartition(), rmContainer);
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -2000,85 +1947,6 @@ public class LeafQueue extends AbstractCSQueue {
     return defaultAppPriorityPerQueue;
   }
 
-  /**
-   *
-   * @param clusterResource Total cluster resource
-   * @param decreaseRequest The decrease request
-   * @param app The application of interest
-   */
-  @Override
-  public void decreaseContainer(Resource clusterResource,
-      SchedContainerChangeRequest decreaseRequest,
-      FiCaSchedulerApp app) throws InvalidResourceRequestException {
-    // If the container being decreased is reserved, we need to unreserve it
-    // first.
-    RMContainer rmContainer = decreaseRequest.getRMContainer();
-    if (rmContainer.hasIncreaseReservation()) {
-      unreserveIncreasedContainer(clusterResource, app,
-          (FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
-    }
-    boolean resourceDecreased = false;
-    Resource resourceBeforeDecrease;
-    // Grab queue lock to avoid race condition when getting container resource
-
-    try {
-      writeLock.lock();
-      // Make sure the decrease request is valid in terms of current resource
-      // and target resource. This must be done under the leaf queue lock.
-      // Throws exception if the check fails.
-      RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false);
-      // Save resource before decrease for debug log
-      resourceBeforeDecrease = Resources.clone(
-          rmContainer.getAllocatedResource());
-      // Do we have increase request for the same container? If so, remove it
-      boolean hasIncreaseRequest = app.removeIncreaseRequest(
-          decreaseRequest.getNodeId(),
-          decreaseRequest.getRMContainer().getAllocatedSchedulerKey(),
-          decreaseRequest.getContainerId());
-      if (hasIncreaseRequest) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("While processing decrease requests, found an increase"
-              + " request for the same container " + decreaseRequest
-              .getContainerId() + ", removed the increase request");
-        }
-      }
-      // Delta capacity is negative when it's a decrease request
-      Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
-      if (Resources.equals(absDelta, Resources.none())) {
-        // If delta capacity of this decrease request is 0, this decrease
-        // request serves the purpose of cancelling an existing increase request
-        // if any
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Decrease target resource equals to existing resource for"
-              + " container:" + decreaseRequest.getContainerId()
-              + " ignore this decrease request.");
-        }
-      } else{
-        // Release the delta resource
-        releaseResource(clusterResource, app, absDelta,
-            decreaseRequest.getNodePartition(),
-            decreaseRequest.getRMContainer(), true);
-        // Notify application
-        app.decreaseContainer(decreaseRequest);
-        // Notify node
-        decreaseRequest.getSchedulerNode().decreaseContainer(
-            decreaseRequest.getContainerId(), absDelta);
-        resourceDecreased = true;
-      }
-    } finally {
-      writeLock.unlock();
-    }
-
-    if (resourceDecreased) {
-      // Notify parent queue outside of leaf queue lock
-      getParent().decreaseContainer(clusterResource, decreaseRequest, app);
-      LOG.info("Application attempt " + app.getApplicationAttemptId()
-          + " decreased container:" + decreaseRequest.getContainerId()
-          + " from " + resourceBeforeDecrease + " to "
-          + decreaseRequest.getTargetCapacity());
-    }
-  }
-
   public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
       Priority newAppPriority) {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 9c42c61..6f82fcc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -773,12 +773,11 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   private void internalReleaseResource(Resource clusterResource,
-      FiCaSchedulerNode node, Resource releasedResource,
-      boolean changeResource) {
+      FiCaSchedulerNode node, Resource releasedResource) {
     try {
       writeLock.lock();
       super.releaseResource(clusterResource, releasedResource,
-          node.getPartition(), changeResource);
+          node.getPartition());
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(
@@ -789,38 +788,6 @@ public class ParentQueue extends AbstractCSQueue {
       writeLock.unlock();
     }
   }
-  
-  @Override
-  public void decreaseContainer(Resource clusterResource,
-      SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app)
-      throws InvalidResourceRequestException {
-    // delta capacity is negative when it's a decrease request
-    Resource absDeltaCapacity =
-        Resources.negate(decreaseRequest.getDeltaCapacity());
-
-    internalReleaseResource(clusterResource,
-        csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false);
-
-    // Inform the parent
-    if (parent != null) {
-      parent.decreaseContainer(clusterResource, decreaseRequest, app);
-    }
-  }
-  
-  @Override
-  public void unreserveIncreasedContainer(Resource clusterResource,
-      FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
-    if (app != null) {
-      internalReleaseResource(clusterResource, node,
-          rmContainer.getReservedResource(), false);
-
-      // Inform the parent
-      if (parent != null) {
-        parent.unreserveIncreasedContainer(clusterResource, app, node,
-            rmContainer);
-      }    
-    }
-  }
 
   @Override
   public void completedContainer(Resource clusterResource,
@@ -830,7 +797,7 @@ public class ParentQueue extends AbstractCSQueue {
       boolean sortQueues) {
     if (application != null) {
       internalReleaseResource(clusterResource, node,
-          rmContainer.getContainer().getResource(), false);
+          rmContainer.getContainer().getResource());
 
       // Inform the parent
       if (parent != null) {
@@ -886,7 +853,7 @@ public class ParentQueue extends AbstractCSQueue {
       FiCaSchedulerNode node = scheduler.getNode(
           rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource,
-          rmContainer.getContainer().getResource(), node.getPartition(), false);
+          rmContainer.getContainer().getResource(), node.getPartition());
     } finally {
       writeLock.unlock();
     }
@@ -923,7 +890,7 @@ public class ParentQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), node.getPartition(), false);
+          .getResource(), node.getPartition());
       LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -943,7 +910,7 @@ public class ParentQueue extends AbstractCSQueue {
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.releaseResource(clusterResource,
           rmContainer.getContainer().getResource(),
-          node.getPartition(), false);
+          node.getPartition());
       LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -960,11 +927,10 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   void allocateResource(Resource clusterResource,
-      Resource resource, String nodePartition, boolean changeContainerResource) {
+      Resource resource, String nodePartition) {
     try {
       writeLock.lock();
-      super.allocateResource(clusterResource, resource, nodePartition,
-          changeContainerResource);
+      super.allocateResource(clusterResource, resource, nodePartition);
 
       /**
        * check if we need to kill (killable) containers if maximum resource violated.
@@ -1054,8 +1020,7 @@ public class ParentQueue extends AbstractCSQueue {
           // Book-keeping
           // Note: Update headroom to account for current allocation too...
           allocateResource(cluster, allocation.getAllocatedOrReservedResource(),
-              schedulerContainer.getNodePartition(),
-              allocation.isIncreasedAllocation());
+              schedulerContainer.getNodePartition());
 
           LOG.info("assignedContainer" + " queue=" + getQueueName()
               + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index 57188d8..4879fae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerAllocator extends AbstractContainerAllocator {
-  private AbstractContainerAllocator increaseContainerAllocator;
   private AbstractContainerAllocator regularContainerAllocator;
 
   public ContainerAllocator(FiCaSchedulerApp application,
@@ -45,8 +44,6 @@ public class ContainerAllocator extends AbstractContainerAllocator {
       RMContext rmContext, ActivitiesManager activitiesManager) {
     super(application, rc, rmContext);
 
-    increaseContainerAllocator =
-        new IncreaseContainerAllocator(application, rc, rmContext);
     regularContainerAllocator = new RegularContainerAllocator(application, rc,
         rmContext, activitiesManager);
   }
@@ -55,32 +52,8 @@ public class ContainerAllocator extends AbstractContainerAllocator {
   public CSAssignment assignContainers(Resource clusterResource,
       PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, RMContainer reservedContainer) {
-    if (reservedContainer != null) {
-      if (reservedContainer.getState() == RMContainerState.RESERVED) {
-        // It's a regular container
-        return regularContainerAllocator.assignContainers(clusterResource,
-            ps, schedulingMode, resourceLimits, reservedContainer);
-      } else {
-        // It's a increase container
-        return increaseContainerAllocator.assignContainers(clusterResource,
-            ps, schedulingMode, resourceLimits, reservedContainer);
-      }
-    } else {
-      /*
-       * Try to allocate increase container first, and if we failed to allocate
-       * anything, we will try to allocate regular container
-       */
-      CSAssignment assign =
-          increaseContainerAllocator.assignContainers(clusterResource, ps,
-              schedulingMode, resourceLimits, null);
-      if (Resources.greaterThan(rc, clusterResource, assign.getResource(),
-          Resources.none())) {
-        return assign;
-      }
-
-      return regularContainerAllocator.assignContainers(clusterResource, ps,
-          schedulingMode, resourceLimits, null);
-    }
+    return regularContainerAllocator.assignContainers(clusterResource,
+        ps, schedulingMode, resourceLimits, reservedContainer);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
deleted file mode 100644
index 0dc527f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class IncreaseContainerAllocator extends AbstractContainerAllocator {
-  private static final Log LOG =
-      LogFactory.getLog(IncreaseContainerAllocator.class);
-
-  public IncreaseContainerAllocator(FiCaSchedulerApp application,
-      ResourceCalculator rc, RMContext rmContext) {
-    super(application, rc, rmContext);
-  }
-  
-  /**
-   * Quick check if we can allocate anything here:
-   * We will not continue if: 
-   * - Headroom doesn't support allocate minimumAllocation
-   * - 
-   */
-  private boolean checkHeadroom(Resource clusterResource,
-      ResourceLimits currentResourceLimits, Resource required) {
-    return Resources.greaterThanOrEqual(rc, clusterResource,
-        currentResourceLimits.getHeadroom(), required);
-  }
-  
-  private CSAssignment createReservedIncreasedCSAssignment(
-      SchedContainerChangeRequest request) {
-    CSAssignment assignment =
-        new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
-            application, CSAssignment.SkippedType.NONE, false);
-    Resources.addTo(assignment.getAssignmentInformation().getReserved(),
-        request.getDeltaCapacity());
-    assignment.getAssignmentInformation().incrReservations();
-    assignment.getAssignmentInformation().addReservationDetails(
-        request.getRMContainer(), application.getCSLeafQueue().getQueuePath());
-    assignment.setIncreasedAllocation(true);
-    
-    LOG.info("Reserved increase container request:" + request.toString());
-    
-    return assignment;
-  }
-  
-  private CSAssignment createSuccessfullyIncreasedCSAssignment(
-      SchedContainerChangeRequest request, boolean fromReservation) {
-    CSAssignment assignment =
-        new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
-            application, CSAssignment.SkippedType.NONE, fromReservation);
-    Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-        request.getDeltaCapacity());
-    assignment.getAssignmentInformation().incrAllocations();
-    assignment.getAssignmentInformation().addAllocationDetails(
-        request.getRMContainer(), application.getCSLeafQueue().getQueuePath());
-    assignment.setIncreasedAllocation(true);
-
-    if (fromReservation) {
-      assignment.setFulfilledReservedContainer(request.getRMContainer());
-    }
-    
-    // notify application
-    application
-        .getCSLeafQueue()
-        .getOrderingPolicy()
-        .containerAllocated(application,
-            application.getRMContainer(request.getContainerId()));
-
-    LOG.info("Approved increase container request:" + request.toString()
-        + " fromReservation=" + fromReservation);    
-    
-    return assignment;
-  }
-  
-  private CSAssignment allocateIncreaseRequestFromReservedContainer(
-      SchedulerNode node, Resource cluster,
-      SchedContainerChangeRequest increaseRequest) {
-    if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
-        node.getUnallocatedResource())) {
-      return createSuccessfullyIncreasedCSAssignment(increaseRequest, true);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to allocate reserved increase request:"
-            + increaseRequest.toString()
-            + ". There's no enough available resource");
-      }
-      
-      // We still cannot allocate this container, will wait for next turn
-      return CSAssignment.SKIP_ASSIGNMENT;
-    }
-  }
-  
-  private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node,
-      Resource cluster, SchedContainerChangeRequest increaseRequest) {
-    if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
-        node.getUnallocatedResource())) {
-      return createSuccessfullyIncreasedCSAssignment(increaseRequest, false);
-    } else{
-      // We cannot allocate this container, but since queue capacity /
-      // user-limit matches, we can reserve this container on this node.
-      return createReservedIncreasedCSAssignment(increaseRequest);
-    }
-  }
-
-  @Override
-  public CSAssignment assignContainers(Resource clusterResource,
-      PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, RMContainer reservedContainer) {
-    AppSchedulingInfo sinfo = application.getAppSchedulingInfo();
-    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
-
-    if (null == node) {
-      // This is global scheduling enabled
-      // FIXME, support container increase when global scheduling enabled
-      return CSAssignment.SKIP_ASSIGNMENT;
-    }
-    NodeId nodeId = node.getNodeID();
-
-    if (reservedContainer == null) {
-      // Do we have increase request on this node?
-      if (!sinfo.hasIncreaseRequest(nodeId)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip allocating increase request since we don't have any"
-              + " increase request on this node=" + node.getNodeID());
-        }
-        
-        return CSAssignment.SKIP_ASSIGNMENT;
-      }
-      
-      // Check if we need to unreserve something, note that we don't support
-      // continuousReservationLooking now. TODO, need think more about how to
-      // support it.
-      boolean shouldUnreserve =
-          Resources.greaterThan(rc, clusterResource,
-              resourceLimits.getAmountNeededUnreserve(), Resources.none());
-      
-      // Check if we can allocate minimum resource according to headroom
-      boolean cannotAllocateAnything =
-          !checkHeadroom(clusterResource, resourceLimits, rmContext
-              .getScheduler().getMinimumResourceCapability());
-      
-      // Skip the app if we failed either of above check
-      if (cannotAllocateAnything || shouldUnreserve) {
-        if (LOG.isDebugEnabled()) {
-          if (shouldUnreserve) {
-            LOG.debug("Cannot continue since we have to unreserve some resource"
-                + ", now increase container allocation doesn't "
-                + "support continuous reservation looking..");
-          }
-          if (cannotAllocateAnything) {
-            LOG.debug("We cannot allocate anything because of low headroom, "
-                + "headroom=" + resourceLimits.getHeadroom());
-          }
-        }
-        
-        return CSAssignment.SKIP_ASSIGNMENT;
-      }
-      
-      CSAssignment assigned = null;
-
-      /*
-       * Loop each priority, and containerId. Container priority is not
-       * equivalent to request priority, application master can run an important
-       * task on a less prioritized container.
-       * 
-       * So behavior here is, we still try to increase container with higher
-       * priority, but will skip increase request and move to next increase
-       * request if queue-limit or user-limit aren't satisfied 
-       */
-      for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Looking at increase request for application="
-              + application.getApplicationAttemptId() + " priority="
-              + schedulerKey.getPriority());
-        }
-
-        /*
-         * If we have multiple to-be-increased containers under same priority on
-         * a same host, we will try to increase earlier launched container
-         * first. And again - we will skip a request and move to next if it
-         * cannot be allocated.
-         */
-        Map<ContainerId, SchedContainerChangeRequest> increaseRequestMap =
-            sinfo.getIncreaseRequests(nodeId, schedulerKey);
-
-        // We don't have more increase request on this priority, skip..
-        if (null == increaseRequestMap) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("There's no increase request for "
-                + application.getApplicationAttemptId() + " priority="
-                + schedulerKey.getPriority());
-          }
-          continue;
-        }
-        Iterator<Entry<ContainerId, SchedContainerChangeRequest>> iter =
-            increaseRequestMap.entrySet().iterator();
-
-        while (iter.hasNext()) {
-          Entry<ContainerId, SchedContainerChangeRequest> entry =
-              iter.next();
-          SchedContainerChangeRequest increaseRequest =
-              entry.getValue();
-          
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "Looking at increase request=" + increaseRequest.toString());
-          }
-
-          boolean headroomSatisifed = checkHeadroom(clusterResource,
-              resourceLimits, increaseRequest.getDeltaCapacity());
-          if (!headroomSatisifed) {
-            // skip if doesn't satisfy headroom limit
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(" Headroom is not satisfied, skip..");
-            }
-            continue;
-          }
-
-          RMContainer rmContainer = increaseRequest.getRMContainer();
-          if (rmContainer.getContainerState() != ContainerState.RUNNING) {
-            // if the container is not running, we should remove the
-            // increaseRequest and continue;
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("  Container is not running any more, skip...");
-            }
-            application.addToBeRemovedIncreaseRequest(increaseRequest);
-            continue;
-          }
-
-          if (!Resources.fitsIn(rc, clusterResource,
-              increaseRequest.getTargetCapacity(), node.getTotalResource())) {
-            // if the target capacity is more than what the node can offer, we
-            // will simply remove and skip it.
-            // The reason of doing check here instead of adding increase request
-            // to scheduler because node's resource could be updated after
-            // request added.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("  Target capacity is more than what node can offer,"
-                  + " node.resource=" + node.getTotalResource());
-            }
-            application.addToBeRemovedIncreaseRequest(increaseRequest);
-            continue;
-          }
-
-          // Try to allocate the increase request
-          assigned =
-              allocateIncreaseRequest(node, clusterResource, increaseRequest);
-          if (assigned.getSkippedType()
-              == CSAssignment.SkippedType.NONE) {
-            // When we don't skip this request, which means we either allocated
-            // OR reserved this request. We will break
-            break;
-          }
-        }
-
-        // We may have allocated something
-        if (assigned != null && assigned.getSkippedType()
-            == CSAssignment.SkippedType.NONE) {
-          break;
-        }
-      }
-      
-      return assigned == null ? CSAssignment.SKIP_ASSIGNMENT : assigned;
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Trying to allocate reserved increase container request..");
-      }
-      
-      // We already reserved this increase container
-      SchedContainerChangeRequest request =
-          sinfo.getIncreaseRequest(nodeId,
-              reservedContainer.getAllocatedSchedulerKey(),
-              reservedContainer.getContainerId());
-      
-      // We will cancel the reservation any of following happens
-      // - Container finished
-      // - No increase request needed
-      // - Target resource updated
-      if (null == request
-          || reservedContainer.getContainerState() != ContainerState.RUNNING
-          || (!Resources.equals(reservedContainer.getReservedResource(),
-              request.getDeltaCapacity()))) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("We don't need reserved increase container request "
-              + "for container=" + reservedContainer.getContainerId()
-              + ". Unreserving and return...");
-        }
-        
-        // We don't need this container now, just return excessive reservation
-        return new CSAssignment(application, reservedContainer);
-      }
-      
-      return allocateIncreaseRequestFromReservedContainer(node, clusterResource,
-          request);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
index ac83d6f..2921e7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
@@ -43,8 +43,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
   // not be included by toRelease list
   private SchedulerContainer<A, N> allocateFromReservedContainer;
 
-  private boolean isIncreasedAllocation;
-
   private NodeType allocationLocalityType;
 
   private NodeType requestLocalityType;
@@ -57,7 +55,7 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
       SchedulerContainer<A, N> allocatedOrReservedContainer,
       List<SchedulerContainer<A, N>> toRelease,
       SchedulerContainer<A, N> allocateFromReservedContainer,
-      boolean isIncreasedAllocation, NodeType allocationLocalityType,
+      NodeType allocationLocalityType,
       NodeType requestLocalityType, SchedulingMode schedulingMode,
       Resource allocatedResource) {
     this.allocatedOrReservedContainer = allocatedOrReservedContainer;
@@ -65,7 +63,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
       this.toRelease = toRelease;
     }
     this.allocateFromReservedContainer = allocateFromReservedContainer;
-    this.isIncreasedAllocation = isIncreasedAllocation;
     this.allocationLocalityType = allocationLocalityType;
     this.requestLocalityType = requestLocalityType;
     this.schedulingMode = schedulingMode;
@@ -84,10 +81,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
     return allocationLocalityType;
   }
 
-  public boolean isIncreasedAllocation() {
-    return isIncreasedAllocation;
-  }
-
   public SchedulerContainer<A, N> getAllocateFromReservedContainer() {
     return allocateFromReservedContainer;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 30b7305..fea29bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -312,54 +312,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return false;
   }
 
-  private SchedContainerChangeRequest getResourceChangeRequest(
-      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
-    return appSchedulingInfo.getIncreaseRequest(
-        schedulerContainer.getSchedulerNode().getNodeID(),
-        schedulerContainer.getSchedulerRequestKey(),
-        schedulerContainer.getRmContainer().getContainerId());
-  }
-
-  private boolean checkIncreaseContainerAllocation(
-      ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
-      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
-    // When increase a container
-    if (schedulerContainer.getRmContainer().getState()
-        != RMContainerState.RUNNING) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Trying to increase a container, but container="
-            + schedulerContainer.getRmContainer().getContainerId()
-            + " is not in running state.");
-      }
-      return false;
-    }
-
-    // Check if increase request is still valid
-    SchedContainerChangeRequest increaseRequest = getResourceChangeRequest(
-        schedulerContainer);
-
-    if (null == increaseRequest || !Resources.equals(
-        increaseRequest.getDeltaCapacity(),
-        allocation.getAllocatedOrReservedResource())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Increase request has been changed, reject this proposal");
-      }
-      return false;
-    }
-
-    if (allocation.getAllocateFromReservedContainer() != null) {
-      // In addition, if allocation is from a reserved container, check
-      // if the reserved container has enough reserved space
-      if (!Resources.equals(
-          allocation.getAllocateFromReservedContainer().getRmContainer()
-              .getReservedResource(), increaseRequest.getDeltaCapacity())) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
   private boolean commonCheckContainerAllocation(
       Resource cluster,
       ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
@@ -445,30 +397,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
             schedulerContainer = allocation.getAllocatedOrReservedContainer();
 
         if (schedulerContainer.isAllocated()) {
-          if (!allocation.isIncreasedAllocation()) {
-            // When allocate a new container
-            resourceRequests =
-                schedulerContainer.getRmContainer().getResourceRequests();
-
-            // Check pending resource request
-            if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(),
-                schedulerContainer.getSchedulerNode(),
-                schedulerContainer.getSchedulerRequestKey())) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("No pending resource for: nodeType=" + allocation
-                    .getAllocationLocalityType() + ", node=" + schedulerContainer
-                    .getSchedulerNode() + ", requestKey=" + schedulerContainer
-                    .getSchedulerRequestKey() + ", application="
-                    + getApplicationAttemptId());
-              }
-
-              return false;
-            }
-          } else {
-            if (!checkIncreaseContainerAllocation(allocation,
-                schedulerContainer)) {
-              return false;
+          // When allocate a new container
+          resourceRequests =
+              schedulerContainer.getRmContainer().getResourceRequests();
+
+          // Check pending resource request
+          if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(),
+              schedulerContainer.getSchedulerNode(),
+              schedulerContainer.getSchedulerRequestKey())) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("No pending resource for: nodeType=" + allocation
+                  .getAllocationLocalityType() + ", node=" + schedulerContainer
+                  .getSchedulerNode() + ", requestKey=" + schedulerContainer
+                  .getSchedulerRequestKey() + ", application="
+                  + getApplicationAttemptId());
             }
+
+            return false;
           }
 
           // Common part of check container allocation regardless if it is a
@@ -541,12 +486,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
         // Generate new containerId if it is not an allocation for increasing
         // Or re-reservation
-        if (!allocation.isIncreasedAllocation()) {
-          if (rmContainer.getContainer().getId() == null) {
-            rmContainer.setContainerId(BuilderUtils
-                .newContainerId(getApplicationAttemptId(),
-                    getNewContainerId()));
-          }
+        if (rmContainer.getContainer().getId() == null) {
+          rmContainer.setContainerId(BuilderUtils
+              .newContainerId(getApplicationAttemptId(),
+                  getNewContainerId()));
         }
         ContainerId containerId = rmContainer.getContainerId();
 
@@ -562,77 +505,50 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
                 schedulerContainer.getSchedulerNode(), reservedContainer);
           }
 
-          // Update this application for the allocated container
-          if (!allocation.isIncreasedAllocation()) {
-            // Allocate a new container
-            addToNewlyAllocatedContainers(
-                schedulerContainer.getSchedulerNode(), rmContainer);
-            liveContainers.put(containerId, rmContainer);
+          // Allocate a new container
+          addToNewlyAllocatedContainers(
+              schedulerContainer.getSchedulerNode(), rmContainer);
+          liveContainers.put(containerId, rmContainer);
 
-            // Deduct pending resource requests
-            List<ResourceRequest> requests = appSchedulingInfo.allocate(
-                allocation.getAllocationLocalityType(),
-                schedulerContainer.getSchedulerNode(),
-                schedulerContainer.getSchedulerRequestKey(),
-                schedulerContainer.getRmContainer().getContainer());
-            ((RMContainerImpl) rmContainer).setResourceRequests(requests);
+          // Deduct pending resource requests
+          List<ResourceRequest> requests = appSchedulingInfo.allocate(
+              allocation.getAllocationLocalityType(),
+              schedulerContainer.getSchedulerNode(),
+              schedulerContainer.getSchedulerRequestKey(),
+              schedulerContainer.getRmContainer().getContainer());
+          ((RMContainerImpl) rmContainer).setResourceRequests(requests);
 
-            attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
-                allocation.getAllocatedOrReservedResource());
+          attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
+              allocation.getAllocatedOrReservedResource());
 
-            rmContainer.handle(
-                new RMContainerEvent(containerId, RMContainerEventType.START));
+          rmContainer.handle(
+              new RMContainerEvent(containerId, RMContainerEventType.START));
 
-            // Inform the node
-            schedulerContainer.getSchedulerNode().allocateContainer(
-                rmContainer);
+          // Inform the node
+          schedulerContainer.getSchedulerNode().allocateContainer(
+              rmContainer);
 
-            // update locality statistics,
-            incNumAllocatedContainers(allocation.getAllocationLocalityType(),
-                allocation.getRequestLocalityType());
+          // update locality statistics,
+          incNumAllocatedContainers(allocation.getAllocationLocalityType(),
+              allocation.getRequestLocalityType());
 
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("allocate: applicationAttemptId=" + containerId
-                  .getApplicationAttemptId() + " container=" + containerId
-                  + " host=" + rmContainer.getAllocatedNode().getHost()
-                  + " type=" + allocation.getAllocationLocalityType());
-            }
-            RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
-                "SchedulerApp", getApplicationId(), containerId,
-                allocation.getAllocatedOrReservedResource());
-          } else{
-            SchedContainerChangeRequest increaseRequest =
-                getResourceChangeRequest(schedulerContainer);
-
-            // allocate resource for an increase request
-            // Notify node
-            schedulerContainer.getSchedulerNode().increaseContainer(
-                increaseRequest.getContainerId(),
-                increaseRequest.getDeltaCapacity());
-
-            // OK, we can allocate this increase request
-            // Notify application
-            increaseContainer(increaseRequest);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("allocate: applicationAttemptId=" + containerId
+                .getApplicationAttemptId() + " container=" + containerId
+                + " host=" + rmContainer.getAllocatedNode().getHost()
+                + " type=" + allocation.getAllocationLocalityType());
           }
+          RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
+              "SchedulerApp", getApplicationId(), containerId,
+              allocation.getAllocatedOrReservedResource());
         } else {
-          if (!allocation.isIncreasedAllocation()) {
-            // If the rmContainer's state is already updated to RESERVED, this is
-            // a reReservation
-            reserve(schedulerContainer.getSchedulerRequestKey(),
-                schedulerContainer.getSchedulerNode(),
-                schedulerContainer.getRmContainer(),
-                schedulerContainer.getRmContainer().getContainer(),
-                reReservation);
-          } else{
-            SchedContainerChangeRequest increaseRequest =
-                getResourceChangeRequest(schedulerContainer);
-
-            reserveIncreasedContainer(
-                schedulerContainer.getSchedulerRequestKey(),
-                schedulerContainer.getSchedulerNode(),
-                increaseRequest.getRMContainer(),
-                increaseRequest.getDeltaCapacity());
-          }
+          // If the rmContainer's state is already updated to RESERVED, this is
+          // a reReservation
+          reserve(schedulerContainer.getSchedulerRequestKey(),
+              schedulerContainer.getSchedulerNode(),
+              schedulerContainer.getRmContainer(),
+              schedulerContainer.getRmContainer().getContainer(),
+              reReservation);
         }
       }
     } finally {
@@ -649,9 +565,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       FiCaSchedulerNode node, RMContainer rmContainer) {
     try {
       writeLock.lock();
-      // Cancel increase request (if it has reserved increase request
-      rmContainer.cancelIncreaseReservation();
-
       // Done with the reservation?
       if (internalUnreserve(node, schedulerKey)) {
         node.unreserveResource(this);
@@ -807,13 +720,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
           .entrySet()) {
         NodeId nodeId = entry.getKey();
         RMContainer reservedContainer = entry.getValue();
-        if (reservedContainer.hasIncreaseReservation()) {
-          // Currently, only regular container allocation supports continuous
-          // reservation looking, we don't support canceling increase request
-          // reservation when allocating regular container.
-          continue;
-        }
-
         Resource reservedResource = reservedContainer.getReservedResource();
 
         // make sure we unreserve one with at least the same amount of
@@ -869,25 +775,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
   }
 
-  public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
-      FiCaSchedulerNode node,
-      RMContainer rmContainer, Resource reservedResource) {
-    // Inform the application
-    if (super.reserveIncreasedContainer(node, schedulerKey, rmContainer,
-        reservedResource)) {
-
-      queue.getMetrics().reserveResource(getUser(), reservedResource);
-
-      // Update the node
-      node.reserveResource(this, schedulerKey, rmContainer);
-
-      // Succeeded
-      return true;
-    }
-
-    return false;
-  }
-
   public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node,
       RMContainer rmContainer, Container container, boolean reReservation) {
     // Update reserved metrics if this is the first reservation
@@ -1114,26 +1001,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     toBeRemovedIncRequests.put(request.getContainerId(), request);
   }
 
-  public void removedToBeRemovedIncreaseRequests() {
-    // Remove invalid in request requests
-    if (!toBeRemovedIncRequests.isEmpty()) {
-      try {
-        writeLock.lock();
-        Iterator<Map.Entry<ContainerId, SchedContainerChangeRequest>> iter =
-            toBeRemovedIncRequests.entrySet().iterator();
-        while (iter.hasNext()) {
-          SchedContainerChangeRequest req = iter.next().getValue();
-          appSchedulingInfo.removeIncreaseRequest(req.getNodeId(),
-              req.getRMContainer().getAllocatedSchedulerKey(),
-              req.getContainerId());
-          iter.remove();
-        }
-      } finally {
-        writeLock.unlock();
-      }
-    }
-  }
-
   /*
    * Overriding to appease findbugs
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 8eac929..c26a11b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -153,20 +153,6 @@ public class FiCaSchedulerNode extends SchedulerNode {
     }
   }
 
-  @Override
-  protected synchronized void changeContainerResource(ContainerId containerId,
-      Resource deltaResource, boolean increase) {
-    super.changeContainerResource(containerId, deltaResource, increase);
-
-    if (killableContainers.containsKey(containerId)) {
-      if (increase) {
-        Resources.addTo(totalKillableResources, deltaResource);
-      } else {
-        Resources.subtractFrom(totalKillableResources, deltaResource);
-      }
-    }
-  }
-
   public synchronized Resource getTotalKillableResources() {
     return totalKillableResources;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3246778..0599414 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -819,9 +819,7 @@ public class FairScheduler extends
     }
 
     // Handle promotions and demotions
-    handleExecutionTypeUpdates(
-        application, updateRequests.getPromotionRequests(),
-        updateRequests.getDemotionRequests());
+    handleContainerUpdates(application, updateRequests);
 
     // Sanity check
     normalizeRequests(ask);
@@ -1769,13 +1767,6 @@ public class FairScheduler extends
     return targetQueueName;
   }
 
-  @Override
-  protected void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest,
-      SchedulerApplicationAttempt attempt) {
-    // TODO Auto-generated method stub    
-  }
-
   public float getReservableNodesRatio() {
     return reservableNodesRatio;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 64dbc7d..a8d4f48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -937,14 +937,6 @@ public class FifoScheduler extends
   }
 
   @Override
-  protected void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest,
-      SchedulerApplicationAttempt attempt) {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
   protected synchronized void nodeUpdate(RMNode nm) {
     super.nodeUpdate(nm);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 899523c..e34665d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -137,11 +137,11 @@ public class TestChildQueueOrder {
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {
           ((ParentQueue)queue).allocateResource(clusterResource, 
-              allocatedResource, RMNodeLabelsManager.NO_LABEL, false);
+              allocatedResource, RMNodeLabelsManager.NO_LABEL);
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
-              allocatedResource, null, null, false);
+              allocatedResource, null, null);
         }
 
         // Next call - nothing

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 0696f57..b4b05ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -88,18 +88,6 @@ public class TestContainerResizing {
     }
 
     @Override
-    protected void decreaseContainers(
-        List<UpdateContainerRequest> decreaseRequests,
-        SchedulerApplicationAttempt attempt) {
-      try {
-        Thread.sleep(1000);
-      } catch(InterruptedException e) {
-        LOG.debug("Thread interrupted.");
-      }
-      super.decreaseContainers(decreaseRequests, attempt);
-    }
-
-    @Override
     public CSAssignment allocateContainersToNode(
         PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
       try {
@@ -288,13 +276,9 @@ public class TestContainerResizing {
     CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-
-    RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
     
     /* Check reservation statuses */
     // Increase request should be reserved
-    Assert.assertTrue(rmContainer1.hasIncreaseReservation());
-    Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
     Assert.assertFalse(app.getReservedContainers().isEmpty());
     Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
     // Pending resource will not be changed since it's not satisfied
@@ -319,7 +303,6 @@ public class TestContainerResizing {
     
     /* Check statuses after reservation satisfied */
     // Increase request should be unreserved
-    Assert.assertFalse(rmContainer1.hasIncreaseReservation());
     Assert.assertTrue(app.getReservedContainers().isEmpty());
     Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
     // Pending resource will be changed since it's satisfied
@@ -391,11 +374,8 @@ public class TestContainerResizing {
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
 
-    RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId2);
-
     /* Check reservation statuses */
     // Increase request should *NOT* be reserved as it exceeds user limit
-    Assert.assertFalse(rmContainer1.hasIncreaseReservation());
     Assert.assertTrue(app.getReservedContainers().isEmpty());
     Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
     // Pending resource will not be changed since it's not satisfied
@@ -471,13 +451,9 @@ public class TestContainerResizing {
     CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-
-    RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
     
     /* Check reservation statuses */
     // Increase request should be reserved
-    Assert.assertTrue(rmContainer1.hasIncreaseReservation());
-    Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
     Assert.assertFalse(app.getReservedContainers().isEmpty());
     Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
     // Pending resource will not be changed since it's not satisfied
@@ -510,7 +486,6 @@ public class TestContainerResizing {
     // Increase request should be unreserved
     Assert.assertTrue(app.getReservedContainers().isEmpty());
     Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
-    Assert.assertFalse(rmContainer1.hasIncreaseReservation());
     // Pending resource will be changed since it's satisfied
     checkPendingResource(rm1, "default", 0 * GB, null);
     Assert.assertEquals(0 * GB,
@@ -585,13 +560,9 @@ public class TestContainerResizing {
     CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-
-    RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
     
     /* Check reservation statuses */
     // Increase request should be reserved
-    Assert.assertTrue(rmContainer1.hasIncreaseReservation());
-    Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
     Assert.assertFalse(app.getReservedContainers().isEmpty());
     Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
     // Pending resource will not be changed since it's not satisfied
@@ -614,7 +585,7 @@ public class TestContainerResizing {
     am1.sendContainerResizingRequest(Arrays.asList(
         UpdateContainerRequest
             .newInstance(0, containerId1,
-                ContainerUpdateType.INCREASE_RESOURCE,
+                ContainerUpdateType.DECREASE_RESOURCE,
                 Resources.createResource(1 * GB), null)));
     // Trigger a node heartbeat..
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
@@ -623,7 +594,6 @@ public class TestContainerResizing {
     // Increase request should be unreserved
     Assert.assertTrue(app.getReservedContainers().isEmpty());
     Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
-    Assert.assertFalse(rmContainer1.hasIncreaseReservation());
     // Pending resource will be changed since it's satisfied
     checkPendingResource(rm1, "default", 0 * GB, null);
     Assert.assertEquals(0 * GB,
@@ -698,12 +668,8 @@ public class TestContainerResizing {
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
 
-    RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
-    
     /* Check reservation statuses */
     // Increase request should be reserved
-    Assert.assertTrue(rmContainer2.hasIncreaseReservation());
-    Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemorySize());
     Assert.assertFalse(app.getReservedContainers().isEmpty());
     Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
     // Pending resource will not be changed since it's not satisfied
@@ -721,12 +687,13 @@ public class TestContainerResizing {
 
     // Complete container2, container will be unreserved and completed
     am1.allocate(null, Arrays.asList(containerId2));
-    
+
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    am1.allocate(null, null);
     /* Check statuses after reservation satisfied */
     // Increase request should be unreserved
     Assert.assertTrue(app.getReservedContainers().isEmpty());
     Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
-    Assert.assertFalse(rmContainer2.hasIncreaseReservation());
     // Pending resource will be changed since it's satisfied
     checkPendingResource(rm1, "default", 0 * GB, null);
     Assert.assertEquals(0 * GB,
@@ -796,12 +763,8 @@ public class TestContainerResizing {
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
 
-    RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
-    
     /* Check reservation statuses */
     // Increase request should be reserved
-    Assert.assertTrue(rmContainer2.hasIncreaseReservation());
-    Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemorySize());
     Assert.assertFalse(app.getReservedContainers().isEmpty());
     Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
     // Pending resource will not be changed since it's not satisfied
@@ -825,11 +788,11 @@ public class TestContainerResizing {
     // Increase request should be unreserved
     Assert.assertTrue(app.getReservedContainers().isEmpty());
     Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
-    Assert.assertFalse(rmContainer2.hasIncreaseReservation());
     // Pending resource will be changed since it's satisfied
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
     checkPendingResource(rm1, "default", 0 * GB, null);
-    Assert.assertEquals(0 * GB,
-        app.getAppAttemptResourceUsage().getPending().getMemorySize());
+
     // Queue/user/application's usage will be updated
     checkUsedResource(rm1, "default", 0 * GB, null);
     // User will be removed
@@ -949,89 +912,6 @@ public class TestContainerResizing {
     rm1.close();
   }
 
-  @Test
-  public void testIncreaseContainerRequestGetPreferrence()
-      throws Exception {
-    /**
-     * There're multiple containers need to be increased, and there're several
-     * container allocation request, scheduler will try to increase container
-     * before allocate new containers
-     */
-    MockRM rm1 = new MockRM() {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
-
-    // app1 -> a1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
-        rm1, app1.getApplicationId());
-    ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
-
-    // Container 2, 3 (priority=3)
-    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2);
-
-    // Container 4, 5 (priority=2)
-    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4);
-
-    // Container 6, 7 (priority=4)
-    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6);
-
-    // am1 asks to change its container[2-7] from 1G to 2G
-    List<UpdateContainerRequest> increaseRequests = new ArrayList<>();
-    for (int cId = 2; cId <= 7; cId++) {
-      ContainerId containerId =
-          ContainerId.newContainerId(am1.getApplicationAttemptId(), cId);
-      increaseRequests.add(UpdateContainerRequest
-          .newInstance(0, containerId,
-              ContainerUpdateType.INCREASE_RESOURCE,
-              Resources.createResource(2 * GB), null));
-    }
-    am1.sendContainerResizingRequest(increaseRequests);
-
-    checkPendingResource(rm1, "default", 6 * GB, null);
-    Assert.assertEquals(6 * GB,
-        app.getAppAttemptResourceUsage().getPending().getMemorySize());
-
-    // Get rmNode1
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    // assignContainer, container-4/5/2 increased (which has highest priority OR
-    // earlier allocated)
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    AllocateResponse allocateResponse = am1.allocate(null, null);
-    Assert.assertEquals(3, allocateResponse.getUpdatedContainers().size());
-    verifyContainerIncreased(allocateResponse,
-        ContainerId.newContainerId(attemptId, 4), 2 * GB);
-    verifyContainerIncreased(allocateResponse,
-        ContainerId.newContainerId(attemptId, 5), 2 * GB);
-    verifyContainerIncreased(allocateResponse,
-        ContainerId.newContainerId(attemptId, 2), 2 * GB);
-
-    /* Check statuses after allocation */
-    // There're still 3 pending increase requests
-    checkPendingResource(rm1, "default", 3 * GB, null);
-    Assert.assertEquals(3 * GB,
-        app.getAppAttemptResourceUsage().getPending().getMemorySize());
-    // Queue/user/application's usage will be updated
-    checkUsedResource(rm1, "default", 10 * GB, null);
-    Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
-        .getUser("user").getUsed().getMemorySize());
-    Assert.assertEquals(0 * GB,
-        app.getAppAttemptResourceUsage().getReserved().getMemorySize());
-    Assert.assertEquals(10 * GB,
-        app.getAppAttemptResourceUsage().getUsed().getMemorySize());
-
-    rm1.close();
-  }
-
   @Test (timeout = 60000)
   public void testDecreaseContainerWillNotDeadlockContainerAllocation()
       throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index 74cecf2..184e854 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -200,6 +200,7 @@ public class TestIncreaseAllocationExpirer {
     // back action to complete
     Thread.sleep(10000);
     // Verify container size is 1G
+    am1.allocate(null, null);
     Assert.assertEquals(
         1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
             .getAllocatedResource().getMemorySize());
@@ -304,6 +305,8 @@ public class TestIncreaseAllocationExpirer {
     // Wait long enough for the second token (5G) to expire, and verify that
     // the roll back action is completed as expected
     Thread.sleep(10000);
+    am1.allocate(null, null);
+    Thread.sleep(2000);
     // Verify container size is rolled back to 3G
     Assert.assertEquals(
         3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
@@ -400,13 +403,13 @@ public class TestIncreaseAllocationExpirer {
     // Decrease containers
     List<UpdateContainerRequest> decreaseRequests = new ArrayList<>();
     decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId2,
-        ContainerUpdateType.INCREASE_RESOURCE,
+        ContainerUpdateType.DECREASE_RESOURCE,
         Resources.createResource(2 * GB), null));
     decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId3,
-        ContainerUpdateType.INCREASE_RESOURCE,
+        ContainerUpdateType.DECREASE_RESOURCE,
         Resources.createResource(4 * GB), null));
     decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId4,
-        ContainerUpdateType.INCREASE_RESOURCE,
+        ContainerUpdateType.DECREASE_RESOURCE,
         Resources.createResource(4 * GB), null));
     AllocateResponse response =
         am1.sendContainerResizingRequest(decreaseRequests);
@@ -418,6 +421,9 @@ public class TestIncreaseAllocationExpirer {
         rm1, containerId4, Resources.createResource(6 * GB)));
     // Wait for containerId3 token to expire,
     Thread.sleep(10000);
+
+    am1.allocate(null, null);
+
     Assert.assertEquals(
         2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
             .getAllocatedResource().getMemorySize());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index ec1b84d..3fbbae3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1061,11 +1061,11 @@ public class TestLeafQueue {
     qb.releaseResource(clusterResource, app_0,
         app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey)
             .getPerAllocationResource(),
-        null, null, false);
+        null, null);
     qb.releaseResource(clusterResource, app_2,
         app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey)
             .getPerAllocationResource(),
-        null, null, false);
+        null, null);
 
     qb.setUserLimit(50);
     qb.setUserLimitFactor(1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 11fea82..c4b7a0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -171,11 +171,11 @@ public class TestParentQueue {
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {
           ((ParentQueue)queue).allocateResource(clusterResource, 
-              allocatedResource, RMNodeLabelsManager.NO_LABEL, false);
+              allocatedResource, RMNodeLabelsManager.NO_LABEL);
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
-              allocatedResource, null, null, false);
+              allocatedResource, null, null);
         }
         
         // Next call - nothing


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda)

Posted by wa...@apache.org.
YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eac6b4c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eac6b4c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eac6b4c3

Branch: refs/heads/trunk
Commit: eac6b4c35c50e555c2f1b5f913bb2c4d839f1ff4
Parents: 480b4dd
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Feb 28 10:35:50 2017 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Feb 28 10:35:50 2017 -0800

----------------------------------------------------------------------
 .../sls/scheduler/ResourceSchedulerWrapper.java |   8 -
 .../server/scheduler/SchedulerRequestKey.java   |  12 +-
 .../server/resourcemanager/RMServerUtils.java   |  27 +-
 .../rmcontainer/RMContainer.java                |   4 -
 .../RMContainerChangeResourceEvent.java         |  44 ---
 .../rmcontainer/RMContainerImpl.java            |  46 ---
 .../scheduler/AbstractYarnScheduler.java        | 171 +++++++---
 .../scheduler/AppSchedulingInfo.java            | 283 +---------------
 .../scheduler/ContainerUpdateContext.java       | 193 ++++++++---
 .../scheduler/SchedulerApplicationAttempt.java  | 212 ++++--------
 .../scheduler/SchedulerNode.java                |  44 ---
 .../scheduler/capacity/AbstractCSQueue.java     |  13 +-
 .../scheduler/capacity/CSQueue.java             |  15 -
 .../scheduler/capacity/CapacityScheduler.java   | 121 +------
 .../scheduler/capacity/LeafQueue.java           | 152 +--------
 .../scheduler/capacity/ParentQueue.java         |  53 +--
 .../capacity/allocator/ContainerAllocator.java  |  31 +-
 .../allocator/IncreaseContainerAllocator.java   | 337 -------------------
 .../common/ContainerAllocationProposal.java     |   9 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java | 245 +++-----------
 .../common/fica/FiCaSchedulerNode.java          |  14 -
 .../scheduler/fair/FairScheduler.java           |  11 +-
 .../scheduler/fifo/FifoScheduler.java           |   8 -
 .../scheduler/capacity/TestChildQueueOrder.java |   4 +-
 .../capacity/TestContainerResizing.java         | 134 +-------
 .../capacity/TestIncreaseAllocationExpirer.java |  12 +-
 .../scheduler/capacity/TestLeafQueue.java       |   4 +-
 .../scheduler/capacity/TestParentQueue.java     |   4 +-
 28 files changed, 482 insertions(+), 1729 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 5517362..df8323a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -969,12 +969,4 @@ final public class ResourceSchedulerWrapper
     return Priority.newInstance(0);
   }
 
-  @Override
-  protected void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest,
-      SchedulerApplicationAttempt attempt) {
-    // TODO Auto-generated method stub
-    
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index 02539ba..c4f37f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -116,7 +116,17 @@ public final class SchedulerRequestKey implements
     if (priorityCompare != 0) {
       return priorityCompare;
     }
-    return Long.compare(allocationRequestId, o.getAllocationRequestId());
+    int allocReqCompare = Long.compare(
+        allocationRequestId, o.getAllocationRequestId());
+
+    if (allocReqCompare != 0) {
+      return allocReqCompare;
+    }
+
+    if (this.containerToUpdate != null && o.containerToUpdate != null) {
+      return (this.containerToUpdate.compareTo(o.containerToUpdate));
+    }
+    return 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index e98141b..0aa7a2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -152,26 +152,16 @@ public class RMServerUtils {
       if (msg == null) {
         if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) &&
             (updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) {
-          Resource original = rmContainer.getContainer().getResource();
-          Resource target = updateReq.getCapability();
-          if (Resources.fitsIn(target, original)) {
-            // This is a decrease request
-            if (validateIncreaseDecreaseRequest(rmContext, updateReq,
-                maximumAllocation, false)) {
-              updateRequests.getDecreaseRequests().add(updateReq);
-              outstandingUpdate.add(updateReq.getContainerId());
-            } else {
-              msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
-            }
-          } else {
-            // This is an increase request
-            if (validateIncreaseDecreaseRequest(rmContext, updateReq,
-                maximumAllocation, true)) {
+          if (validateIncreaseDecreaseRequest(
+              rmContext, updateReq, maximumAllocation)) {
+            if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
               updateRequests.getIncreaseRequests().add(updateReq);
-              outstandingUpdate.add(updateReq.getContainerId());
             } else {
-              msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+              updateRequests.getDecreaseRequests().add(updateReq);
             }
+            outstandingUpdate.add(updateReq.getContainerId());
+          } else {
+            msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
           }
         } else {
           ExecutionType original = rmContainer.getExecutionType();
@@ -329,8 +319,7 @@ public class RMServerUtils {
 
   // Sanity check and normalize target resource
   private static boolean validateIncreaseDecreaseRequest(RMContext rmContext,
-      UpdateContainerRequest request, Resource maximumAllocation,
-      boolean increase) {
+      UpdateContainerRequest request, Resource maximumAllocation) {
     if (request.getCapability().getMemorySize() < 0
         || request.getCapability().getMemorySize() > maximumAllocation
         .getMemorySize()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 020764b..7ad381e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -91,10 +91,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   String getNodeHttpAddress();
   
   String getNodeLabelExpression();
-  
-  boolean hasIncreaseReservation();
-  
-  void cancelIncreaseReservation();
 
   String getQueueName();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
deleted file mode 100644
index 920cfdb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public class RMContainerChangeResourceEvent extends RMContainerEvent {
-  
-  final Resource targetResource;
-  final boolean increase;
-
-  public RMContainerChangeResourceEvent(ContainerId containerId,
-      Resource targetResource, boolean increase) {
-    super(containerId, RMContainerEventType.CHANGE_RESOURCE);
-
-    this.targetResource = targetResource;
-    this.increase = increase;
-  }
-  
-  public Resource getTargetResource() {
-    return targetResource;
-  }
-  
-  public boolean isIncrease() {
-    return increase;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 72ce1a0..12fbbea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -131,8 +131,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
-        RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
-    .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
         RMContainerEventType.ACQUIRE_UPDATED_CONTAINER, 
         new ContainerAcquiredWhileRunningTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
@@ -183,7 +181,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   private boolean isAMContainer;
   private List<ResourceRequest> resourceRequests;
 
-  private volatile boolean hasIncreaseReservation = false;
   // Only used for container resource increase and decrease. This is the
   // resource to rollback to should container resource increase token expires.
   private Resource lastConfirmedResource;
@@ -561,12 +558,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       if (c != null) {
         c.setNodeId(container.reservedNode);
       }
-
-      if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
-          .contains(container.getState())) {
-        // When container's state != NEW/RESERVED, it is an increase reservation
-        container.hasIncreaseReservation = true;
-      }
     }
   }
 
@@ -681,33 +672,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       }
     }
   }
-  
-  private static final class ChangeResourceTransition extends BaseTransition {
-
-    @Override
-    public void transition(RMContainerImpl container, RMContainerEvent event) {
-      RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
-
-      Resource targetResource = changeEvent.getTargetResource();
-      Resource lastConfirmedResource = container.lastConfirmedResource;
-
-      if (!changeEvent.isIncrease()) {
-        // Only unregister from the containerAllocationExpirer when target
-        // resource is less than or equal to the last confirmed resource.
-        if (Resources.fitsIn(targetResource, lastConfirmedResource)) {
-          container.lastConfirmedResource = targetResource;
-          container.containerAllocationExpirer.unregister(
-              new AllocationExpirationInfo(event.getContainerId()));
-        }
-      }
-
-      container.container.setResource(targetResource);
-
-      // We reach here means we either allocated increase reservation OR
-      // decreased container, reservation will be cancelled anyway. 
-      container.hasIncreaseReservation = false;
-    }
-  }
 
   private static class FinishedTransition extends BaseTransition {
 
@@ -857,16 +821,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     return -1;
   }
 
-  @Override
-  public boolean hasIncreaseReservation() {
-    return hasIncreaseReservation;
-  }
-
-  @Override
-  public void cancelIncreaseReservation() {
-    hasIncreaseReservation = false;
-  }
-
   public void setQueueName(String queueName) {
     this.queueName = queueName;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index ce6d2a2..213839d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -85,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 
 
+
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -597,6 +600,8 @@ public abstract class AbstractYarnScheduler
 
     if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
       completedContainerInternal(rmContainer, containerStatus, event);
+      completeOustandingUpdatesWhichAreReserved(
+          rmContainer, containerStatus, event);
     } else {
       ContainerId containerId = rmContainer.getContainerId();
       // Inform the container
@@ -622,6 +627,33 @@ public abstract class AbstractYarnScheduler
     recoverResourceRequestForContainer(rmContainer);
   }
 
+  // Optimization:
+  // Check if there are in-flight container updates and complete the
+  // associated temp containers. These are removed when the app completes,
+  // but removing them when the actual container completes would allow the
+  // scheduler to reallocate those resources sooner.
+  private void completeOustandingUpdatesWhichAreReserved(
+      RMContainer rmContainer, ContainerStatus containerStatus,
+      RMContainerEventType event) {
+    N schedulerNode = getSchedulerNode(rmContainer.getNodeId());
+    if (schedulerNode != null &&
+        schedulerNode.getReservedContainer() != null) {
+      RMContainer resContainer = schedulerNode.getReservedContainer();
+      if (resContainer.getReservedSchedulerKey() != null) {
+        ContainerId containerToUpdate = resContainer
+            .getReservedSchedulerKey().getContainerToUpdate();
+        if (containerToUpdate != null &&
+            containerToUpdate.equals(containerStatus.getContainerId())) {
+          completedContainerInternal(resContainer,
+              ContainerStatus.newInstance(resContainer.getContainerId(),
+                  containerStatus.getState(), containerStatus
+                      .getDiagnostics(),
+                  containerStatus.getExitStatus()), event);
+        }
+      }
+    }
+  }
+
   // clean up a completed container
   protected abstract void completedContainerInternal(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event);
@@ -650,28 +682,6 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  protected void decreaseContainers(
-      List<UpdateContainerRequest> decreaseRequests,
-      SchedulerApplicationAttempt attempt) {
-    if (null == decreaseRequests || decreaseRequests.isEmpty()) {
-      return;
-    }
-    // Pre-process decrease requests
-    List<SchedContainerChangeRequest> schedDecreaseRequests =
-        createSchedContainerChangeRequests(decreaseRequests, false);
-    for (SchedContainerChangeRequest request : schedDecreaseRequests) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Processing decrease request:" + request);
-      }
-      // handle decrease request
-      decreaseContainer(request, attempt);
-    }
-  }
-
-  protected abstract void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest,
-      SchedulerApplicationAttempt attempt);
-
   @Override
   public N getSchedulerNode(NodeId nodeId) {
     return nodeTracker.getNode(nodeId);
@@ -1074,21 +1084,39 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  protected void handleExecutionTypeUpdates(
-      SchedulerApplicationAttempt appAttempt,
-      List<UpdateContainerRequest> promotionRequests,
-      List<UpdateContainerRequest> demotionRequests) {
+  protected void handleContainerUpdates(
+      SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) {
+    List<UpdateContainerRequest> promotionRequests =
+        updates.getPromotionRequests();
     if (promotionRequests != null && !promotionRequests.isEmpty()) {
       LOG.info("Promotion Update requests : " + promotionRequests);
-      handlePromotionRequests(appAttempt, promotionRequests);
+      // Promotion is technically an increase request from
+      // 0 resources to target resources.
+      handleIncreaseRequests(appAttempt, promotionRequests);
     }
+    List<UpdateContainerRequest> increaseRequests =
+        updates.getIncreaseRequests();
+    if (increaseRequests != null && !increaseRequests.isEmpty()) {
+      LOG.info("Resource increase requests : " + increaseRequests);
+      handleIncreaseRequests(appAttempt, increaseRequests);
+    }
+    List<UpdateContainerRequest> demotionRequests =
+        updates.getDemotionRequests();
     if (demotionRequests != null && !demotionRequests.isEmpty()) {
       LOG.info("Demotion Update requests : " + demotionRequests);
-      handleDemotionRequests(appAttempt, demotionRequests);
+      // Demotion is technically a decrease request from initial
+      // to 0 resources
+      handleDecreaseRequests(appAttempt, demotionRequests);
+    }
+    List<UpdateContainerRequest> decreaseRequests =
+        updates.getDecreaseRequests();
+    if (decreaseRequests != null && !decreaseRequests.isEmpty()) {
+      LOG.info("Resource decrease requests : " + decreaseRequests);
+      handleDecreaseRequests(appAttempt, decreaseRequests);
     }
   }
 
-  private void handlePromotionRequests(
+  private void handleIncreaseRequests(
       SchedulerApplicationAttempt applicationAttempt,
       List<UpdateContainerRequest> updateContainerRequests) {
     for (UpdateContainerRequest uReq : updateContainerRequests) {
@@ -1118,7 +1146,7 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt,
+  private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt,
       List<UpdateContainerRequest> demotionRequests) {
     OpportunisticContainerContext oppCntxt =
         appAttempt.getOpportunisticContainerContext();
@@ -1126,24 +1154,59 @@ public abstract class AbstractYarnScheduler
       RMContainer rmContainer =
           rmContext.getScheduler().getRMContainer(uReq.getContainerId());
       if (rmContainer != null) {
-        if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases(
-            rmContainer.getContainer())) {
-          RMContainer demotedRMContainer =
-              createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
-          appAttempt.addToNewlyDemotedContainers(
-              uReq.getContainerId(), demotedRMContainer);
+        SchedulerNode schedulerNode = rmContext.getScheduler()
+            .getSchedulerNode(rmContainer.getContainer().getNodeId());
+        if (appAttempt.getUpdateContext()
+            .checkAndAddToOutstandingDecreases(uReq, schedulerNode,
+                rmContainer.getContainer())) {
+          if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE ==
+              uReq.getContainerUpdateType()) {
+            RMContainer demotedRMContainer =
+                createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
+            appAttempt.addToNewlyDemotedContainers(
+                uReq.getContainerId(), demotedRMContainer);
+          } else {
+            RMContainer demotedRMContainer = createDecreasedRMContainer(
+                appAttempt, uReq, rmContainer);
+            appAttempt.addToNewlyDecreasedContainers(
+                uReq.getContainerId(), demotedRMContainer);
+          }
         } else {
           appAttempt.addToUpdateContainerErrors(
               UpdateContainerError.newInstance(
               RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
         }
       } else {
-        LOG.warn("Cannot demote non-existent (or completed) Container ["
-            + uReq.getContainerId() + "]");
+        LOG.warn("Cannot demote/decrease non-existent (or completed) " +
+            "Container [" + uReq.getContainerId() + "]");
       }
     }
   }
 
+  private RMContainer createDecreasedRMContainer(
+      SchedulerApplicationAttempt appAttempt, UpdateContainerRequest uReq,
+      RMContainer rmContainer) {
+    SchedulerRequestKey sk =
+        SchedulerRequestKey.extractFrom(rmContainer.getContainer());
+    Container decreasedContainer = BuilderUtils.newContainer(
+        ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
+            appAttempt.getNewContainerId()),
+        rmContainer.getContainer().getNodeId(),
+        rmContainer.getContainer().getNodeHttpAddress(),
+        Resources.none(),
+        sk.getPriority(), null, rmContainer.getExecutionType(),
+        sk.getAllocationRequestId());
+    decreasedContainer.setVersion(rmContainer.getContainer().getVersion());
+    RMContainer newRmContainer = new RMContainerImpl(decreasedContainer,
+        sk, appAttempt.getApplicationAttemptId(),
+        decreasedContainer.getNodeId(), appAttempt.getUser(), rmContext,
+        rmContainer.isRemotelyAllocated());
+    appAttempt.addRMContainer(decreasedContainer.getId(), rmContainer);
+    ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
+        decreasedContainer.getNodeId()).allocateContainer(newRmContainer);
+    return newRmContainer;
+  }
+
   private RMContainer createDemotedRMContainer(
       SchedulerApplicationAttempt appAttempt,
       OpportunisticContainerContext oppCntxt,
@@ -1162,4 +1225,36 @@ public abstract class AbstractYarnScheduler
     return SchedulerUtils.createOpportunisticRmContainer(
         rmContext, demotedContainer, false);
   }
+
+  /**
+   * Rollback container update after expiry.
+   * @param containerId ContainerId.
+   */
+  protected void rollbackContainerUpdate(
+      ContainerId containerId) {
+    RMContainer rmContainer = getRMContainer(containerId);
+    if (rmContainer == null) {
+      LOG.info("Cannot rollback resource for container " + containerId
+          + ". The container does not exist.");
+      return;
+    }
+    T app = getCurrentAttemptForContainer(containerId);
+    if (getCurrentAttemptForContainer(containerId) == null) {
+      LOG.info("Cannot rollback resource for container " + containerId
+          + ". The application that the container "
+          + "belongs to does not exist.");
+      return;
+    }
+
+    if (Resources.fitsIn(rmContainer.getLastConfirmedResource(),
+        rmContainer.getContainer().getResource())) {
+      LOG.info("Roll back resource for container " + containerId);
+      handleDecreaseRequests(app, Arrays.asList(
+          UpdateContainerRequest.newInstance(
+              rmContainer.getContainer().getVersion(),
+              rmContainer.getContainerId(),
+              ContainerUpdateType.DECREASE_RESOURCE,
+              rmContainer.getLastConfirmedResource(), null)));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 48ecd2e..bff9c41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -90,9 +90,7 @@ public class AppSchedulingInfo {
       schedulerKeys = new ConcurrentSkipListMap<>();
   final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
       schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
-  final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
-      SchedContainerChangeRequest>>> containerIncreaseRequestMap =
-      new ConcurrentHashMap<>();
+
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
 
@@ -158,137 +156,6 @@ public class AppSchedulingInfo {
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
-  public boolean hasIncreaseRequest(NodeId nodeId) {
-    try {
-      this.readLock.lock();
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      return requestsOnNode == null ? false : requestsOnNode.size() > 0;
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  public Map<ContainerId, SchedContainerChangeRequest>
-      getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
-    try {
-      this.readLock.lock();
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      return requestsOnNode == null ? null : requestsOnNode.get(
-          schedulerKey);
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  /**
-   * return true if any of the existing increase requests are updated,
-   *        false if none of them are updated
-   */
-  public boolean updateIncreaseRequests(
-      List<SchedContainerChangeRequest> increaseRequests) {
-    boolean resourceUpdated = false;
-
-    try {
-      this.writeLock.lock();
-      for (SchedContainerChangeRequest r : increaseRequests) {
-        if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
-          LOG.warn("rmContainer's state is not RUNNING, for increase request"
-              + " with container-id=" + r.getContainerId());
-          continue;
-        }
-        try {
-          RMServerUtils.checkSchedContainerChangeRequest(r, true);
-        } catch (YarnException e) {
-          LOG.warn("Error happens when checking increase request, Ignoring.."
-              + " exception=", e);
-          continue;
-        }
-        NodeId nodeId = r.getRMContainer().getAllocatedNode();
-
-        Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-            requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-        if (null == requestsOnNode) {
-          requestsOnNode = new TreeMap<>();
-          containerIncreaseRequestMap.put(nodeId, requestsOnNode);
-        }
-
-        SchedContainerChangeRequest prevChangeRequest =
-            getIncreaseRequest(nodeId,
-                r.getRMContainer().getAllocatedSchedulerKey(),
-                r.getContainerId());
-        if (null != prevChangeRequest) {
-          if (Resources.equals(prevChangeRequest.getTargetCapacity(),
-              r.getTargetCapacity())) {
-            // increase request hasn't changed
-            continue;
-          }
-
-          // remove the old one, as we will use the new one going forward
-          removeIncreaseRequest(nodeId,
-              prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
-              prevChangeRequest.getContainerId());
-        }
-
-        if (Resources.equals(r.getTargetCapacity(),
-            r.getRMContainer().getAllocatedResource())) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Trying to increase container " + r.getContainerId()
-                + ", target capacity = previous capacity = " + prevChangeRequest
-                + ". Will ignore this increase request.");
-          }
-          continue;
-        }
-
-        // add the new one
-        resourceUpdated = true;
-        insertIncreaseRequest(r);
-      }
-      return resourceUpdated;
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
-  /**
-   * Insert increase request, adding any missing items in the data-structure
-   * hierarchy.
-   */
-  private void insertIncreaseRequest(SchedContainerChangeRequest request) {
-    NodeId nodeId = request.getNodeId();
-    SchedulerRequestKey schedulerKey =
-        request.getRMContainer().getAllocatedSchedulerKey();
-    ContainerId containerId = request.getContainerId();
-    
-    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-    if (null == requestsOnNode) {
-      requestsOnNode = new HashMap<>();
-      containerIncreaseRequestMap.put(nodeId, requestsOnNode);
-    }
-
-    Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-        requestsOnNode.get(schedulerKey);
-    if (null == requestsOnNodeWithPriority) {
-      requestsOnNodeWithPriority = new TreeMap<>();
-      requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
-      incrementSchedulerKeyReference(schedulerKey);
-    }
-
-    requestsOnNodeWithPriority.put(containerId, request);
-
-    // update resources
-    String partition = request.getRMContainer().getNodeLabelExpression();
-    Resource delta = request.getDeltaCapacity();
-    appResourceUsage.incPending(partition, delta);
-    queue.incPendingResource(partition, delta);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Added increase request:" + request.getContainerId()
-          + " delta=" + delta);
-    }
-  }
 
   private void incrementSchedulerKeyReference(
       SchedulerRequestKey schedulerKey) {
@@ -312,73 +179,6 @@ public class AppSchedulingInfo {
     }
   }
 
-  public boolean removeIncreaseRequest(NodeId nodeId,
-      SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    try {
-      this.writeLock.lock();
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      if (null == requestsOnNode) {
-        return false;
-      }
-
-      Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-          requestsOnNode.get(schedulerKey);
-      if (null == requestsOnNodeWithPriority) {
-        return false;
-      }
-
-      SchedContainerChangeRequest request =
-          requestsOnNodeWithPriority.remove(containerId);
-    
-      // remove hierarchies if it becomes empty
-      if (requestsOnNodeWithPriority.isEmpty()) {
-        requestsOnNode.remove(schedulerKey);
-        decrementSchedulerKeyReference(schedulerKey);
-      }
-      if (requestsOnNode.isEmpty()) {
-        containerIncreaseRequestMap.remove(nodeId);
-      }
-
-      if (request == null) {
-        return false;
-      }
-
-      // update queue's pending resource if request exists
-      String partition = request.getRMContainer().getNodeLabelExpression();
-      Resource delta = request.getDeltaCapacity();
-      appResourceUsage.decPending(partition, delta);
-      queue.decPendingResource(partition, delta);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("remove increase request:" + request);
-      }
-
-      return true;
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-  
-  public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
-      SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    try {
-      this.readLock.lock();
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      if (null == requestsOnNode) {
-        return null;
-      }
-
-      Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-          requestsOnNode.get(schedulerKey);
-      return requestsOnNodeWithPriority == null ? null
-          : requestsOnNodeWithPriority.get(containerId);
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
   public ContainerUpdateContext getUpdateContext() {
     return updateContext;
   }
@@ -514,21 +314,6 @@ public class AppSchedulingInfo {
     appResourceUsage.decPending(partition, toDecrease);
   }
 
-  private boolean hasRequestLabelChanged(ResourceRequest requestOne,
-      ResourceRequest requestTwo) {
-    String requestOneLabelExp = requestOne.getNodeLabelExpression();
-    String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
-    // First request label expression can be null and second request
-    // is not null then we have to consider it as changed.
-    if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
-      return true;
-    }
-    // If the label is not matching between both request when
-    // requestOneLabelExp is not null.
-    return ((null != requestOneLabelExp) && !(requestOneLabelExp
-        .equals(requestTwoLabelExp)));
-  }
-
   /**
    * The ApplicationMaster is updating the placesBlacklistedByApp used for
    * containers other than AMs.
@@ -601,22 +386,6 @@ public class AppSchedulingInfo {
     return ret;
   }
 
-  public SchedulingPlacementSet getFirstSchedulingPlacementSet() {
-    try {
-      readLock.lock();
-      for (SchedulerRequestKey key : schedulerKeys.keySet()) {
-        SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(key);
-        if (null != ps) {
-          return ps;
-        }
-      }
-      return null;
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
   public PendingAsk getNextPendingAsk() {
     try {
       readLock.lock();
@@ -666,56 +435,6 @@ public class AppSchedulingInfo {
     }
   }
 
-  public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
-    NodeId nodeId = increaseRequest.getNodeId();
-    SchedulerRequestKey schedulerKey =
-        increaseRequest.getRMContainer().getAllocatedSchedulerKey();
-    ContainerId containerId = increaseRequest.getContainerId();
-    Resource deltaCapacity = increaseRequest.getDeltaCapacity();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("allocated increase request : applicationId=" + applicationId
-          + " container=" + containerId + " host="
-          + increaseRequest.getNodeId() + " user=" + user + " resource="
-          + deltaCapacity);
-    }
-    try {
-      this.writeLock.lock();
-      // Set queue metrics
-      queue.getMetrics().allocateResources(user, deltaCapacity);
-      // remove the increase request from pending increase request map
-      removeIncreaseRequest(nodeId, schedulerKey, containerId);
-      // update usage
-      appResourceUsage.incUsed(increaseRequest.getNodePartition(),
-          deltaCapacity);
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-  
-  public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
-    // Delta is negative when it's a decrease request
-    Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Decrease container : applicationId=" + applicationId
-          + " container=" + decreaseRequest.getContainerId() + " host="
-          + decreaseRequest.getNodeId() + " user=" + user + " resource="
-          + absDelta);
-    }
-
-    try {
-      this.writeLock.lock();
-      // Set queue metrics
-      queue.getMetrics().releaseResources(user, absDelta);
-
-      // update usage
-      appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
   public List<ResourceRequest> allocate(NodeType type,
       SchedulerNode node, SchedulerRequestKey schedulerKey,
       Container containerAllocated) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index 7381250..5ac2ac5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -28,17 +28,19 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
+    .RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+    .SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -58,43 +60,37 @@ public class ContainerUpdateContext {
   private final Map<SchedulerRequestKey, Map<Resource,
       Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();
 
-  private final Set<ContainerId> outstandingDecreases = new HashSet<>();
+  private final Map<ContainerId, Resource> outstandingDecreases =
+      new HashMap<>();
   private final AppSchedulingInfo appSchedulingInfo;
 
   ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
     this.appSchedulingInfo = appSchedulingInfo;
   }
 
-  private synchronized boolean isBeingIncreased(Container container) {
-    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
-        outstandingIncreases.get(
-            new SchedulerRequestKey(container.getPriority(),
-                container.getAllocationRequestId(), container.getId()));
-    if (resourceMap != null) {
-      Map<NodeId, Set<ContainerId>> locationMap =
-          resourceMap.get(container.getResource());
-      if (locationMap != null) {
-        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
-        if (containerIds != null && !containerIds.isEmpty()) {
-          return containerIds.contains(container.getId());
-        }
-      }
-    }
-    return false;
-  }
-
   /**
    * Add the container to outstanding decreases.
+   * @param updateReq UpdateContainerRequest.
+   * @param schedulerNode SchedulerNode.
    * @param container Container.
-   * @return true if updated to outstanding decreases was successful.
+   * @return If it was possible to decrease the container.
    */
   public synchronized boolean checkAndAddToOutstandingDecreases(
+      UpdateContainerRequest updateReq, SchedulerNode schedulerNode,
       Container container) {
-    if (isBeingIncreased(container)
-        || outstandingDecreases.contains(container.getId())) {
+    if (outstandingDecreases.containsKey(container.getId())) {
       return false;
     }
-    outstandingDecreases.add(container.getId());
+    if (ContainerUpdateType.DECREASE_RESOURCE ==
+        updateReq.getContainerUpdateType()) {
+      SchedulerRequestKey updateKey = new SchedulerRequestKey
+          (container.getPriority(),
+              container.getAllocationRequestId(), container.getId());
+      cancelPreviousRequest(schedulerNode, updateKey);
+      outstandingDecreases.put(container.getId(), updateReq.getCapability());
+    } else {
+      outstandingDecreases.put(container.getId(), container.getResource());
+    }
     return true;
   }
 
@@ -117,35 +113,63 @@ public class ContainerUpdateContext {
     if (resourceMap == null) {
       resourceMap = new HashMap<>();
       outstandingIncreases.put(schedulerKey, resourceMap);
+    } else {
+      // Updating Resource for and existing increase container
+      if (ContainerUpdateType.INCREASE_RESOURCE ==
+          updateRequest.getContainerUpdateType()) {
+        cancelPreviousRequest(schedulerNode, schedulerKey);
+      } else {
+        return false;
+      }
     }
+    Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
     Map<NodeId, Set<ContainerId>> locationMap =
-        resourceMap.get(container.getResource());
+        resourceMap.get(resToIncrease);
     if (locationMap == null) {
       locationMap = new HashMap<>();
-      resourceMap.put(container.getResource(), locationMap);
+      resourceMap.put(resToIncrease, locationMap);
     }
     Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
     if (containerIds == null) {
       containerIds = new HashSet<>();
       locationMap.put(container.getNodeId(), containerIds);
     }
-    if (containerIds.contains(container.getId())
-        || outstandingDecreases.contains(container.getId())) {
+    if (outstandingDecreases.containsKey(container.getId())) {
       return false;
     }
-    containerIds.add(container.getId());
 
-    Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
-        new HashMap<>();
-    Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
-    Map<String, ResourceRequest> resMap =
-        createResourceRequests(rmContainer, schedulerNode,
-            schedulerKey, resToIncrease);
-    updateResReqs.put(schedulerKey, resMap);
-    appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+    containerIds.add(container.getId());
+    if (!Resources.isNone(resToIncrease)) {
+      Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
+          new HashMap<>();
+      Map<String, ResourceRequest> resMap =
+          createResourceRequests(rmContainer, schedulerNode,
+              schedulerKey, resToIncrease);
+      updateResReqs.put(schedulerKey, resMap);
+      appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+    }
     return true;
   }
 
+  private void cancelPreviousRequest(SchedulerNode schedulerNode,
+      SchedulerRequestKey schedulerKey) {
+    SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet =
+        appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+    if (schedulingPlacementSet != null) {
+      Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet
+          .getResourceRequests();
+      ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
+      // Decrement the pending using a dummy RR with
+      // resource = prev update req capability
+      if (prevReq != null) {
+        appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
+            schedulerKey, Container.newInstance(UNDEFINED,
+                schedulerNode.getNodeID(), "host:port",
+                prevReq.getCapability(), schedulerKey.getPriority(), null));
+      }
+    }
+  }
+
   private Map<String, ResourceRequest> createResourceRequests(
       RMContainer rmContainer, SchedulerNode schedulerNode,
       SchedulerRequestKey schedulerKey, Resource resToIncrease) {
@@ -171,10 +195,16 @@ public class ContainerUpdateContext {
         ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
       return rmContainer.getContainer().getResource();
     }
-    // TODO: Fix this for container increase..
-    //       This has to equal the Resources in excess of fitsIn()
-    //       for container increase and is equal to the container total
-    //       resource for Promotion.
+    if (updateReq.getContainerUpdateType() ==
+        ContainerUpdateType.INCREASE_RESOURCE) {
+      //       This has to equal the Resources in excess of fitsIn()
+      //       for container increase and is equal to the container total
+      //       resource for Promotion.
+      Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(),
+          rmContainer.getContainer().getResource());
+      return Resources.add(maxCap,
+          Resources.negate(rmContainer.getContainer().getResource()));
+    }
     return null;
   }
 
@@ -228,6 +258,7 @@ public class ContainerUpdateContext {
   /**
    * Check if a new container is to be matched up against an outstanding
    * Container increase request.
+   * @param node SchedulerNode.
    * @param schedulerKey SchedulerRequestKey.
    * @param rmContainer RMContainer.
    * @return ContainerId.
@@ -264,4 +295,80 @@ public class ContainerUpdateContext {
     }
     return retVal;
   }
+
+  /**
+   * Swaps the existing RMContainer's and the temp RMContainers internal
+   * container references after adjusting the resources in each.
+   * @param tempRMContainer Temp RMContainer.
+   * @param existingRMContainer Existing RMContainer.
+   * @param updateType Update Type.
+   * @return Existing RMContainer after swapping the container references.
+   */
+  public RMContainer swapContainer(RMContainer tempRMContainer,
+      RMContainer existingRMContainer, ContainerUpdateType updateType) {
+    ContainerId matchedContainerId = existingRMContainer.getContainerId();
+    // Swap updated container with the existing container
+    Container tempContainer = tempRMContainer.getContainer();
+
+    Resource updatedResource = createUpdatedResource(
+        tempContainer, existingRMContainer.getContainer(), updateType);
+    Resource resourceToRelease = createResourceToRelease(
+        existingRMContainer.getContainer(), updateType);
+    Container newContainer = Container.newInstance(matchedContainerId,
+        existingRMContainer.getContainer().getNodeId(),
+        existingRMContainer.getContainer().getNodeHttpAddress(),
+        updatedResource,
+        existingRMContainer.getContainer().getPriority(), null,
+        tempContainer.getExecutionType());
+    newContainer.setAllocationRequestId(
+        existingRMContainer.getContainer().getAllocationRequestId());
+    newContainer.setVersion(existingRMContainer.getContainer().getVersion());
+
+    tempRMContainer.getContainer().setResource(resourceToRelease);
+    tempRMContainer.getContainer().setExecutionType(
+        existingRMContainer.getContainer().getExecutionType());
+
+    ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
+    return existingRMContainer;
+  }
+
+  /**
+   * Returns the resource that the container will finally be assigned with
+   * at the end of the update operation.
+   * @param tempContainer Temporary Container created for the operation.
+   * @param existingContainer Existing Container.
+   * @param updateType Update Type.
+   * @return Final Resource.
+   */
+  private Resource createUpdatedResource(Container tempContainer,
+      Container existingContainer, ContainerUpdateType updateType) {
+    if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
+      return Resources.add(existingContainer.getResource(),
+          tempContainer.getResource());
+    } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+      return outstandingDecreases.get(existingContainer.getId());
+    } else {
+      return existingContainer.getResource();
+    }
+  }
+
+  /**
+   * Returns the resources that need to be released at the end of the update
+   * operation.
+   * @param existingContainer Existing Container.
+   * @param updateType Updated type.
+   * @return Resources to be released.
+   */
+  private Resource createResourceToRelease(Container existingContainer,
+      ContainerUpdateType updateType) {
+    if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
+      return Resources.none();
+    } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){
+      return Resources.add(existingContainer.getResource(),
+          Resources.negate(
+              outstandingDecreases.get(existingContainer.getId())));
+    } else {
+      return existingContainer.getResource();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 0e79838..f894a40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+    .RMNodeDecreaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@@ -136,9 +139,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
 
   protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
+  protected List<RMContainer> tempContainerToKill = new ArrayList<>();
   protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>();
   protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>();
-  protected List<RMContainer> tempContainerToKill = new ArrayList<>();
   protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
   protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
   protected Set<NMToken> updatedNMTokens = new HashSet<>();
@@ -670,6 +673,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
           rmContainer.getContainerId(),
           ContainerUpdateType.INCREASE_RESOURCE == updateType));
+      if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(),
+                Collections.singletonList(rmContainer.getContainer())));
+      }
     }
     return container;
   }
@@ -717,11 +725,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     }
   }
 
-  public void addToNewlyDemotedContainers(ContainerId containerId,
+  public synchronized void addToNewlyDemotedContainers(ContainerId containerId,
       RMContainer rmContainer) {
     newlyDemotedContainers.put(containerId, rmContainer);
   }
 
+  public synchronized void addToNewlyDecreasedContainers(
+      ContainerId containerId, RMContainer rmContainer) {
+    newlyDecreasedContainers.put(containerId, rmContainer);
+  }
+
   protected synchronized void addToUpdateContainerErrors(
       UpdateContainerError error) {
     updateContainerErrors.add(error);
@@ -729,10 +742,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
 
   protected synchronized void addToNewlyAllocatedContainers(
       SchedulerNode node, RMContainer rmContainer) {
-    if (oppContainerContext == null) {
-      newlyAllocatedContainers.add(rmContainer);
-      return;
-    }
     ContainerId matchedContainerId =
         getUpdateContext().matchContainerToOutstandingIncreaseReq(
             node, rmContainer.getAllocatedSchedulerKey(), rmContainer);
@@ -745,7 +754,21 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
         // occurs when using MiniYARNCluster to test).
         tempContainerToKill.add(rmContainer);
       } else {
-        newlyPromotedContainers.put(matchedContainerId, rmContainer);
+        RMContainer existingContainer = getRMContainer(matchedContainerId);
+        // If this container was already GUARANTEED, then it is an
+        // increase, else its a promotion
+        if (existingContainer == null ||
+            EnumSet.of(RMContainerState.COMPLETED, RMContainerState.KILLED,
+                RMContainerState.EXPIRED, RMContainerState.RELEASED).contains(
+                    existingContainer.getState())) {
+          tempContainerToKill.add(rmContainer);
+        } else {
+          if (ExecutionType.GUARANTEED == existingContainer.getExecutionType()) {
+            newlyIncreasedContainers.put(matchedContainerId, rmContainer);
+          } else {
+            newlyPromotedContainers.put(matchedContainerId, rmContainer);
+          }
+        }
       }
     } else {
       newlyAllocatedContainers.add(rmContainer);
@@ -753,15 +776,25 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   }
 
   public List<Container> pullNewlyPromotedContainers() {
-    return pullContainersWithUpdatedExecType(newlyPromotedContainers,
+    return pullNewlyUpdatedContainers(newlyPromotedContainers,
         ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
   }
 
   public List<Container> pullNewlyDemotedContainers() {
-    return pullContainersWithUpdatedExecType(newlyDemotedContainers,
+    return pullNewlyUpdatedContainers(newlyDemotedContainers,
         ContainerUpdateType.DEMOTE_EXECUTION_TYPE);
   }
 
+  public List<Container> pullNewlyIncreasedContainers() {
+    return pullNewlyUpdatedContainers(newlyIncreasedContainers,
+        ContainerUpdateType.INCREASE_RESOURCE);
+  }
+
+  public List<Container> pullNewlyDecreasedContainers() {
+    return pullNewlyUpdatedContainers(newlyDecreasedContainers,
+        ContainerUpdateType.DECREASE_RESOURCE);
+  }
+
   public List<UpdateContainerError> pullUpdateContainerErrors() {
     List<UpdateContainerError> errors =
         new ArrayList<>(updateContainerErrors);
@@ -775,11 +808,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
    * GUARANTEED to OPPORTUNISTIC.
    * @return Newly Promoted and Demoted containers
    */
-  private List<Container> pullContainersWithUpdatedExecType(
+  private List<Container> pullNewlyUpdatedContainers(
       Map<ContainerId, RMContainer> newlyUpdatedContainers,
       ContainerUpdateType updateTpe) {
     List<Container> updatedContainers = new ArrayList<>();
-    if (oppContainerContext == null) {
+    if (oppContainerContext == null &&
+        (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateTpe
+            || ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) {
       return updatedContainers;
     }
     try {
@@ -789,19 +824,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       while (i.hasNext()) {
         Map.Entry<ContainerId, RMContainer> entry = i.next();
         ContainerId matchedContainerId = entry.getKey();
-        RMContainer rmContainer = entry.getValue();
-
-        // swap containers
-        RMContainer existingRMContainer = swapContainer(
-            rmContainer, matchedContainerId);
-        getUpdateContext().removeFromOutstandingUpdate(
-            rmContainer.getAllocatedSchedulerKey(),
-            existingRMContainer.getContainer());
-        Container updatedContainer = updateContainerAndNMToken(
-            existingRMContainer, updateTpe);
-        updatedContainers.add(updatedContainer);
-
-        tempContainerToKill.add(rmContainer);
+        RMContainer tempRMContainer = entry.getValue();
+
+        RMContainer existingRMContainer =
+            getRMContainer(matchedContainerId);
+        if (existingRMContainer != null) {
+          // swap containers
+          existingRMContainer = getUpdateContext().swapContainer(
+              tempRMContainer, existingRMContainer, updateTpe);
+          getUpdateContext().removeFromOutstandingUpdate(
+              tempRMContainer.getAllocatedSchedulerKey(),
+              existingRMContainer.getContainer());
+          Container updatedContainer = updateContainerAndNMToken(
+              existingRMContainer, updateTpe);
+          updatedContainers.add(updatedContainer);
+        }
+        tempContainerToKill.add(tempRMContainer);
         i.remove();
       }
       // Release all temporary containers
@@ -823,68 +861,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     }
   }
 
-  private RMContainer swapContainer(RMContainer rmContainer, ContainerId
-      matchedContainerId) {
-    RMContainer existingRMContainer =
-        getRMContainer(matchedContainerId);
-    if (existingRMContainer != null) {
-      // Swap updated container with the existing container
-      Container updatedContainer = rmContainer.getContainer();
-
-      Container newContainer = Container.newInstance(matchedContainerId,
-          existingRMContainer.getContainer().getNodeId(),
-          existingRMContainer.getContainer().getNodeHttpAddress(),
-          updatedContainer.getResource(),
-          existingRMContainer.getContainer().getPriority(), null,
-          updatedContainer.getExecutionType());
-      newContainer.setAllocationRequestId(
-          existingRMContainer.getContainer().getAllocationRequestId());
-      newContainer.setVersion(existingRMContainer.getContainer().getVersion());
-
-      rmContainer.getContainer().setResource(
-          existingRMContainer.getContainer().getResource());
-      rmContainer.getContainer().setExecutionType(
-          existingRMContainer.getContainer().getExecutionType());
-
-      ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
-    }
-    return existingRMContainer;
-  }
-
-  private List<Container> pullNewlyUpdatedContainers(
-      Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
-    try {
-      writeLock.lock();
-      List <Container> returnContainerList = new ArrayList <Container>(
-          updatedContainerMap.size());
-
-      Iterator<Entry<ContainerId, RMContainer>> i =
-          updatedContainerMap.entrySet().iterator();
-      while (i.hasNext()) {
-        RMContainer rmContainer = i.next().getValue();
-        Container updatedContainer = updateContainerAndNMToken(rmContainer,
-            increase ? ContainerUpdateType.INCREASE_RESOURCE :
-                ContainerUpdateType.DECREASE_RESOURCE);
-        if (updatedContainer != null) {
-          returnContainerList.add(updatedContainer);
-          i.remove();
-        }
-      }
-      return returnContainerList;
-    } finally {
-      writeLock.unlock();
-    }
-
-  }
-
-  public List<Container> pullNewlyIncreasedContainers() {
-    return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
-  }
-  
-  public List<Container> pullNewlyDecreasedContainers() {
-    return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
-  }
-  
   public List<NMToken> pullUpdatedNMTokens() {
     try {
       writeLock.lock();
@@ -1252,68 +1228,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   public ResourceUsage getSchedulingResourceUsage() {
     return attemptResourceUsage;
   }
-  
-  public boolean removeIncreaseRequest(NodeId nodeId,
-      SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    try {
-      writeLock.lock();
-      return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
-          containerId);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-  
-  public boolean updateIncreaseRequests(
-      List<SchedContainerChangeRequest> increaseRequests) {
-    try {
-      writeLock.lock();
-      return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-  
-  private void changeContainerResource(
-      SchedContainerChangeRequest changeRequest, boolean increase) {
-    try {
-      writeLock.lock();
-      if (increase) {
-        appSchedulingInfo.increaseContainer(changeRequest);
-      } else{
-        appSchedulingInfo.decreaseContainer(changeRequest);
-      }
-
-      RMContainer changedRMContainer = changeRequest.getRMContainer();
-      changedRMContainer.handle(
-          new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
-              changeRequest.getTargetCapacity(), increase));
-
-      // remove pending and not pulled by AM newly-increased or
-      // decreased-containers and add the new one
-      if (increase) {
-        newlyDecreasedContainers.remove(changeRequest.getContainerId());
-        newlyIncreasedContainers.put(changeRequest.getContainerId(),
-            changedRMContainer);
-      } else{
-        newlyIncreasedContainers.remove(changeRequest.getContainerId());
-        newlyDecreasedContainers.put(changeRequest.getContainerId(),
-            changedRMContainer);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-  
-  public void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest) {
-    changeContainerResource(decreaseRequest, false);
-  }
-  
-  public void increaseContainer(
-      SchedContainerChangeRequest increaseRequest) {
-    changeContainerResource(increaseRequest, true);
-  }
 
   public void setAppAMNodePartitionName(String partitionName) {
     this.appAMNodePartitionName = partitionName;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 9c2dff3..db17b42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -180,49 +180,6 @@ public abstract class SchedulerNode {
   }
 
   /**
-   * Change the resources allocated for a container.
-   * @param containerId Identifier of the container to change.
-   * @param deltaResource Change in the resource allocation.
-   * @param increase True if the change is an increase of allocation.
-   */
-  protected synchronized void changeContainerResource(ContainerId containerId,
-      Resource deltaResource, boolean increase) {
-    if (increase) {
-      deductUnallocatedResource(deltaResource);
-    } else {
-      addUnallocatedResource(deltaResource);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug((increase ? "Increased" : "Decreased") + " container "
-              + containerId + " of capacity " + deltaResource + " on host "
-              + rmNode.getNodeAddress() + ", which has " + numContainers
-              + " containers, " + getAllocatedResource() + " used and "
-              + getUnallocatedResource() + " available after allocation");
-    }
-  }
-
-  /**
-   * Increase the resources allocated to a container.
-   * @param containerId Identifier of the container to change.
-   * @param deltaResource Increase of resource allocation.
-   */
-  public synchronized void increaseContainer(ContainerId containerId,
-      Resource deltaResource) {
-    changeContainerResource(containerId, deltaResource, true);
-  }
-
-  /**
-   * Decrease the resources allocated to a container.
-   * @param containerId Identifier of the container to change.
-   * @param deltaResource Decrease of resource allocation.
-   */
-  public synchronized void decreaseContainer(ContainerId containerId,
-      Resource deltaResource) {
-    changeContainerResource(containerId, deltaResource, false);
-  }
-
-  /**
    * Get unallocated resources on the node.
    * @return Unallocated resources on the node
    */
@@ -280,7 +237,6 @@ public abstract class SchedulerNode {
     if (info == null) {
       return;
     }
-
     if (!releasedByNode && info.launchedOnNode) {
       // wait until node reports container has completed
       return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index e9ef319..aa60c9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -453,14 +453,13 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   void allocateResource(Resource clusterResource,
-      Resource resource, String nodePartition, boolean changeContainerResource) {
+      Resource resource, String nodePartition) {
     try {
       writeLock.lock();
       queueUsage.incUsed(nodePartition, resource);
 
-      if (!changeContainerResource) {
-        ++numContainers;
-      }
+      ++numContainers;
+
       CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
           minimumAllocation, this, labelManager, nodePartition);
     } finally {
@@ -469,7 +468,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   protected void releaseResource(Resource clusterResource,
-      Resource resource, String nodePartition, boolean changeContainerResource) {
+      Resource resource, String nodePartition) {
     try {
       writeLock.lock();
       queueUsage.decUsed(nodePartition, resource);
@@ -477,9 +476,7 @@ public abstract class AbstractCSQueue implements CSQueue {
       CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
           minimumAllocation, this, labelManager, nodePartition);
 
-      if (!changeContainerResource) {
-        --numContainers;
-      }
+      --numContainers;
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index a65b3d2..6d30386 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -231,14 +231,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
       boolean sortQueues);
 
   /**
-   * We have a reserved increased container in the queue, we need to unreserve
-   * it. Since we just want to cancel the reserved increase request instead of
-   * stop the container, we shouldn't call completedContainer for such purpose.
-   */
-  public void unreserveIncreasedContainer(Resource clusterResource,
-      FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer);
-
-  /**
    * Get the number of applications in the queue.
    * @return number of applications
    */
@@ -333,13 +325,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    *          new resource asked
    */
   public void decPendingResource(String nodeLabel, Resource resourceToDec);
-  
-  /**
-   * Decrease container resource in the queue
-   */
-  public void decreaseContainer(Resource clusterResource,
-      SchedContainerChangeRequest decreaseRequest,
-      FiCaSchedulerApp app) throws InvalidResourceRequestException;
 
   /**
    * Get valid Node Labels for this queue

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 3517764..20ea607 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
@@ -60,9 +59,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -85,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -99,7 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -872,43 +868,6 @@ public class CapacityScheduler extends
     }
   }
 
-  private LeafQueue updateIncreaseRequests(
-      List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) {
-    // When application has some pending to-be-removed resource requests,
-    app.removedToBeRemovedIncreaseRequests();
-
-    if (null == increaseRequests || increaseRequests.isEmpty()) {
-      return null;
-    }
-
-    // Pre-process increase requests
-    List<SchedContainerChangeRequest> schedIncreaseRequests =
-        createSchedContainerChangeRequests(increaseRequests, true);
-    LeafQueue leafQueue = (LeafQueue) app.getQueue();
-
-    try {
-      /*
-       * Acquire application's lock here to make sure application won't
-       * finish when updateIncreaseRequest is called.
-       */
-      app.getWriteLock().lock();
-      // make sure we aren't stopping/removing the application
-      // when the allocate comes in
-      if (app.isStopped()) {
-        return null;
-      }
-      // Process increase resource requests
-      if (app.updateIncreaseRequests(schedIncreaseRequests)) {
-        return leafQueue;
-      }
-    } finally {
-      app.getWriteLock().unlock();
-    }
-
-
-    return null;
-  }
-
   @Override
   @Lock(Lock.NoLock.class)
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
@@ -920,21 +879,13 @@ public class CapacityScheduler extends
       return EMPTY_ALLOCATION;
     }
 
-    // Handle promotions and demotions
-    handleExecutionTypeUpdates(
-        application, updateRequests.getPromotionRequests(),
-        updateRequests.getDemotionRequests());
+    // Handle all container updates
+    handleContainerUpdates(application, updateRequests);
 
     // Release containers
     releaseContainers(release, application);
 
-    // update increase requests
-    LeafQueue updateDemandForQueue =
-        updateIncreaseRequests(updateRequests.getIncreaseRequests(),
-        application);
-
-    // Decrease containers
-    decreaseContainers(updateRequests.getDecreaseRequests(), application);
+    LeafQueue updateDemandForQueue = null;
 
     // Sanity check for new allocation requests
     normalizeRequests(ask);
@@ -959,8 +910,7 @@ public class CapacityScheduler extends
         }
 
         // Update application requests
-        if (application.updateResourceRequests(ask) && (updateDemandForQueue
-            == null)) {
+        if (application.updateResourceRequests(ask)) {
           updateDemandForQueue = (LeafQueue) application.getQueue();
         }
 
@@ -1466,7 +1416,7 @@ public class CapacityScheduler extends
           (ContainerExpiredSchedulerEvent) event;
       ContainerId containerId = containerExpiredEvent.getContainerId();
       if (containerExpiredEvent.isIncrease()) {
-        rollbackContainerResource(containerId);
+        rollbackContainerUpdate(containerId);
       } else {
         completedContainer(getRMContainer(containerId),
             SchedulerUtils.createAbnormalContainerStatus(
@@ -1618,31 +1568,6 @@ public class CapacityScheduler extends
     }
   }
 
-  private void rollbackContainerResource(
-      ContainerId containerId) {
-    RMContainer rmContainer = getRMContainer(containerId);
-    if (rmContainer == null) {
-      LOG.info("Cannot rollback resource for container " + containerId
-          + ". The container does not exist.");
-      return;
-    }
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Cannot rollback resource for container " + containerId
-          + ". The application that the container "
-          + "belongs to does not exist.");
-      return;
-    }
-    LOG.info("Roll back resource for container " + containerId);
-
-    SchedulerNode schedulerNode = getSchedulerNode(
-        rmContainer.getAllocatedNode());
-    SchedContainerChangeRequest decreaseRequest =
-        new SchedContainerChangeRequest(this.rmContext, schedulerNode,
-            rmContainer, rmContainer.getLastConfirmedResource());
-    decreaseContainer(decreaseRequest, application);
-  }
-
   @Override
   protected void completedContainerInternal(
       RMContainer rmContainer, ContainerStatus containerStatus,
@@ -1676,32 +1601,6 @@ public class CapacityScheduler extends
         rmContainer, containerStatus, event, null, true);
   }
 
-  @Override
-  protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
-      SchedulerApplicationAttempt attempt) {
-    RMContainer rmContainer = decreaseRequest.getRMContainer();
-    // Check container status before doing decrease
-    if (rmContainer.getState() != RMContainerState.RUNNING) {
-      LOG.info(
-          "Trying to decrease a container not in RUNNING state, container="
-              + rmContainer + " state=" + rmContainer.getState().name());
-      return;
-    }
-    FiCaSchedulerApp app = (FiCaSchedulerApp) attempt;
-    LeafQueue queue = (LeafQueue) attempt.getQueue();
-    try {
-      queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
-      // Notify RMNode that the container can be pulled by NodeManager in the
-      // next heartbeat
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
-              Collections.singletonList(rmContainer.getContainer())));
-    } catch (InvalidResourceRequestException e) {
-      LOG.warn("Error happens when checking decrease request, Ignoring.."
-          + " exception=", e);
-    }
-  }
-
   @Lock(Lock.NoLock.class)
   @VisibleForTesting
   @Override
@@ -2386,8 +2285,8 @@ public class CapacityScheduler extends
             getSchedulerContainer(rmContainer, true),
             getSchedulerContainersToRelease(csAssignment),
             getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
-                false), csAssignment.isIncreasedAllocation(),
-            csAssignment.getType(), csAssignment.getRequestLocalityType(),
+                false), csAssignment.getType(),
+            csAssignment.getRequestLocalityType(),
             csAssignment.getSchedulingMode() != null ?
                 csAssignment.getSchedulingMode() :
                 SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
@@ -2403,8 +2302,8 @@ public class CapacityScheduler extends
             getSchedulerContainer(rmContainer, false),
             getSchedulerContainersToRelease(csAssignment),
             getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
-                false), csAssignment.isIncreasedAllocation(),
-            csAssignment.getType(), csAssignment.getRequestLocalityType(),
+                false), csAssignment.getType(),
+            csAssignment.getRequestLocalityType(),
             csAssignment.getSchedulingMode() != null ?
                 csAssignment.getSchedulingMode() :
                 SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org