You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/16 19:56:40 UTC

[21/23] hadoop git commit: YARN-1651. CapacityScheduler side changes to support container resize. Contributed by Wangda Tan

YARN-1651. CapacityScheduler side changes to support container resize. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/733b0f68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/733b0f68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/733b0f68

Branch: refs/heads/YARN-1197
Commit: 733b0f68061558dc32eddb1f112447f5f4be02d0
Parents: b7c4cd5
Author: Jian He <ji...@apache.org>
Authored: Tue Sep 15 10:21:39 2015 +0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Wed Sep 16 10:55:49 2015 -0700

----------------------------------------------------------------------
 .../v2/app/rm/TestRMContainerAllocator.java     |  11 +-
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |  14 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |  13 +
 .../sls/scheduler/ResourceSchedulerWrapper.java |  21 +-
 .../sls/scheduler/SLSCapacityScheduler.java     |  19 +-
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../api/impl/TestAMRMClientOnRMRestart.java     |   8 +-
 .../resource/DefaultResourceCalculator.java     |   5 +
 .../resource/DominantResourceCalculator.java    |   6 +
 .../yarn/util/resource/ResourceCalculator.java  |   5 +
 .../hadoop/yarn/util/resource/Resources.java    |   5 +
 .../util/resource/TestResourceCalculator.java   |  30 +-
 .../protocolrecords/NodeHeartbeatResponse.java  |   5 +-
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |   5 +-
 .../ApplicationMasterService.java               |  22 +-
 .../server/resourcemanager/RMAuditLogger.java   |   2 +
 .../server/resourcemanager/RMServerUtils.java   | 164 ++++
 .../resourcemanager/ResourceTrackerService.java |   7 +-
 .../rmapp/attempt/RMAppAttemptImpl.java         |   4 +-
 .../rmcontainer/RMContainer.java                |   4 +
 .../RMContainerChangeResourceEvent.java         |  44 +
 .../rmcontainer/RMContainerEventType.java       |  13 +-
 .../rmcontainer/RMContainerImpl.java            | 121 ++-
 .../RMContainerUpdatesAcquiredEvent.java        |  35 +
 .../server/resourcemanager/rmnode/RMNode.java   |   9 +
 .../rmnode/RMNodeDecreaseContainerEvent.java    |  39 +
 .../resourcemanager/rmnode/RMNodeEventType.java |   1 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  93 ++
 .../rmnode/RMNodeStatusEvent.java               |  32 +-
 .../scheduler/AbstractYarnScheduler.java        | 150 ++-
 .../resourcemanager/scheduler/Allocation.java   |  22 +-
 .../scheduler/AppSchedulingInfo.java            | 249 ++++-
 .../resourcemanager/scheduler/QueueMetrics.java |  16 +-
 .../scheduler/SchedContainerChangeRequest.java  | 118 +++
 .../scheduler/SchedulerApplication.java         |   2 +-
 .../scheduler/SchedulerApplicationAttempt.java  | 253 +++--
 .../scheduler/SchedulerNode.java                |  31 +
 .../scheduler/SchedulerUtils.java               |  11 +-
 .../scheduler/YarnScheduler.java                |  14 +-
 .../scheduler/capacity/AbstractCSQueue.java     |  23 +-
 .../scheduler/capacity/CSAssignment.java        |   9 +
 .../scheduler/capacity/CSQueue.java             |  16 +
 .../scheduler/capacity/CapacityScheduler.java   |  83 +-
 .../scheduler/capacity/LeafQueue.java           | 127 ++-
 .../scheduler/capacity/ParentQueue.java         | 115 ++-
 .../allocator/AbstractContainerAllocator.java   | 131 +++
 .../capacity/allocator/ContainerAllocator.java  | 149 +--
 .../allocator/IncreaseContainerAllocator.java   | 365 +++++++
 .../allocator/RegularContainerAllocator.java    |  30 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java |  68 +-
 .../scheduler/fair/FairScheduler.java           |  35 +-
 .../scheduler/fifo/FifoScheduler.java           |  25 +-
 .../server/resourcemanager/Application.java     |   2 +-
 .../yarn/server/resourcemanager/MockAM.java     |   9 +
 .../yarn/server/resourcemanager/MockNodes.java  |  13 +
 .../yarn/server/resourcemanager/MockRM.java     |  13 +
 .../TestApplicationMasterService.java           | 144 ++-
 .../applicationsmanager/TestAMRestart.java      |  15 +-
 .../TestRMAppLogAggregationStatus.java          |  10 +-
 .../attempt/TestRMAppAttemptTransitions.java    |  32 +-
 .../rmcontainer/TestRMContainerImpl.java        | 117 ++-
 .../capacity/TestCapacityScheduler.java         | 128 ++-
 .../scheduler/capacity/TestChildQueueOrder.java |   4 +-
 .../capacity/TestContainerAllocation.java       |  50 +-
 .../capacity/TestContainerResizing.java         | 963 +++++++++++++++++++
 .../scheduler/capacity/TestLeafQueue.java       |   4 +-
 .../scheduler/capacity/TestParentQueue.java     |   4 +-
 .../scheduler/capacity/TestReservations.java    |   9 +-
 .../scheduler/fair/FairSchedulerTestBase.java   |   6 +-
 .../fair/TestContinuousScheduling.java          |   2 +-
 .../scheduler/fair/TestFairScheduler.java       |  30 +-
 .../scheduler/fifo/TestFifoScheduler.java       |  28 +-
 72 files changed, 3856 insertions(+), 509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index e148c32..2bb7e27 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -98,6 +98,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -1575,8 +1576,10 @@ public class TestRMContainerAllocator {
     @Override
     public synchronized Allocation allocate(
         ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
-        List<ContainerId> release, 
-        List<String> blacklistAdditions, List<String> blacklistRemovals) {
+        List<ContainerId> release, List<String> blacklistAdditions,
+        List<String> blacklistRemovals,
+        List<ContainerResourceChangeRequest> increaseRequests,
+        List<ContainerResourceChangeRequest> decreaseRequests) {
       List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
       for (ResourceRequest req : ask) {
         ResourceRequest reqCopy = ResourceRequest.newInstance(req
@@ -1590,8 +1593,8 @@ public class TestRMContainerAllocator {
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
       return super.allocate(
-          applicationAttemptId, askCopy, release, 
-          blacklistAdditions, blacklistRemovals);
+          applicationAttemptId, askCopy, release, blacklistAdditions,
+          blacklistRemovals, increaseRequests, decreaseRequests);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 2d2c3e0..dae2ce7 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -174,6 +175,19 @@ public class NodeInfo {
     public Set<String> getNodeLabels() {
       return RMNodeLabelsManager.EMPTY_STRING_SET;
     }
+
+    @Override
+    public void updateNodeHeartbeatResponseForContainersDecreasing(
+        NodeHeartbeatResponse response) {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public List<Container> pullNewlyIncreasedContainers() {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   public static RMNode newNodeInfo(String rackName, String hostName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index ecc4734..8c65ccc 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -163,4 +164,16 @@ public class RMNodeWrapper implements RMNode {
   public Set<String> getNodeLabels() {
     return RMNodeLabelsManager.EMPTY_STRING_SET;
   }
+
+  @Override
+  public void updateNodeHeartbeatResponseForContainersDecreasing(
+      NodeHeartbeatResponse response) {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public List<Container> pullNewlyIncreasedContainers() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 14e2645..310b3b5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -202,15 +204,16 @@ final public class ResourceSchedulerWrapper
 
   @Override
   public Allocation allocate(ApplicationAttemptId attemptId,
-                             List<ResourceRequest> resourceRequests,
-                             List<ContainerId> containerIds,
-                             List<String> strings, List<String> strings2) {
+      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+      List<String> strings, List<String> strings2,
+      List<ContainerResourceChangeRequest> increaseRequests,
+      List<ContainerResourceChangeRequest> decreaseRequests) {
     if (metricsON) {
       final Timer.Context context = schedulerAllocateTimer.time();
       Allocation allocation = null;
       try {
         allocation = scheduler.allocate(attemptId, resourceRequests,
-                containerIds, strings, strings2);
+                containerIds, strings, strings2, null, null);
         return allocation;
       } finally {
         context.stop();
@@ -224,7 +227,7 @@ final public class ResourceSchedulerWrapper
       }
     } else {
       return scheduler.allocate(attemptId,
-              resourceRequests, containerIds, strings, strings2);
+              resourceRequests, containerIds, strings, strings2, null, null);
     }
   }
 
@@ -959,4 +962,12 @@ final public class ResourceSchedulerWrapper
     return Priority.newInstance(0);
   }
 
+  @Override
+  protected void decreaseContainer(
+      SchedContainerChangeRequest decreaseRequest,
+      SchedulerApplicationAttempt attempt) {
+    // TODO Auto-generated method stub
+    
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index a4416db..3626027 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -176,15 +177,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
 
   @Override
   public Allocation allocate(ApplicationAttemptId attemptId,
-                             List<ResourceRequest> resourceRequests,
-                             List<ContainerId> containerIds,
-                             List<String> strings, List<String> strings2) {
+      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+      List<String> strings, List<String> strings2,
+      List<ContainerResourceChangeRequest> increaseRequests,
+      List<ContainerResourceChangeRequest> decreaseRequests) {
     if (metricsON) {
       final Timer.Context context = schedulerAllocateTimer.time();
       Allocation allocation = null;
       try {
-        allocation = super.allocate(attemptId, resourceRequests,
-                containerIds, strings, strings2);
+        allocation = super
+            .allocate(attemptId, resourceRequests, containerIds, strings,
+                strings2, increaseRequests, decreaseRequests);
         return allocation;
       } finally {
         context.stop();
@@ -197,8 +200,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
         }
       }
     } else {
-      return super.allocate(attemptId,
-              resourceRequests, containerIds, strings, strings2);
+      return super.allocate(attemptId, resourceRequests, containerIds, strings,
+          strings2, increaseRequests, decreaseRequests);
     }
   }
 
@@ -426,7 +429,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
     if (pool != null)  pool.shutdown();
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   private void initMetrics() throws Exception {
     metrics = new MetricRegistry();
     // configuration

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d7ff457..5c0f849 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -214,6 +214,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3868. Recovery support for container resizing. (Meng Ding via jianhe)
 
+    YARN-1651. CapacityScheduler side changes to support container resize.
+    (Wangda Tan via jianhe)
+ 
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index 108ad37..2394747 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -525,7 +526,9 @@ public class TestAMRMClientOnRMRestart {
     public synchronized Allocation allocate(
         ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
         List<ContainerId> release, List<String> blacklistAdditions,
-        List<String> blacklistRemovals) {
+        List<String> blacklistRemovals,
+        List<ContainerResourceChangeRequest> increaseRequests,
+        List<ContainerResourceChangeRequest> decreaseRequests) {
       List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
       for (ResourceRequest req : ask) {
         ResourceRequest reqCopy =
@@ -539,7 +542,8 @@ public class TestAMRMClientOnRMRestart {
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
       return super.allocate(applicationAttemptId, askCopy, release,
-          blacklistAdditions, blacklistRemovals);
+          blacklistAdditions, blacklistRemovals, increaseRequests,
+          decreaseRequests);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index c2fc1f0..2fdf214 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -110,4 +110,9 @@ public class DefaultResourceCalculator extends ResourceCalculator {
         );
   }
 
+  @Override
+  public boolean fitsIn(Resource cluster,
+      Resource smaller, Resource bigger) {
+    return smaller.getMemory() <= bigger.getMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 2ee95ce..b5c9967 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -209,4 +209,10 @@ public class DominantResourceCalculator extends ResourceCalculator {
         );
   }
 
+  @Override
+  public boolean fitsIn(Resource cluster,
+      Resource smaller, Resource bigger) {
+    return smaller.getMemory() <= bigger.getMemory()
+        && smaller.getVirtualCores() <= bigger.getVirtualCores();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 442196c..3a31225 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -171,4 +171,9 @@ public abstract class ResourceCalculator {
    */
   public abstract Resource divideAndCeil(Resource numerator, int denominator);
   
+  /**
+   * Check if a smaller resource can be contained by bigger resource.
+   */
+  public abstract boolean fitsIn(Resource cluster,
+      Resource smaller, Resource bigger);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 503d456..b05d021 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -267,6 +267,11 @@ public class Resources {
     return smaller.getMemory() <= bigger.getMemory() &&
         smaller.getVirtualCores() <= bigger.getVirtualCores();
   }
+
+  public static boolean fitsIn(ResourceCalculator rc, Resource cluster,
+      Resource smaller, Resource bigger) {
+    return rc.fitsIn(cluster, smaller, bigger);
+  }
   
   public static Resource componentwiseMin(Resource lhs, Resource rhs) {
     return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
index 6a0b62e..0654891 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
@@ -41,6 +41,35 @@ public class TestResourceCalculator {
   public TestResourceCalculator(ResourceCalculator rs) {
     this.resourceCalculator = rs;
   }
+  
+  @Test(timeout = 10000)
+  public void testFitsIn() {
+    Resource cluster = Resource.newInstance(1024, 1);
+
+    if (resourceCalculator instanceof DefaultResourceCalculator) {
+      Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(1, 2), Resource.newInstance(2, 1)));
+      Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(1, 2), Resource.newInstance(2, 2)));
+      Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(1, 2), Resource.newInstance(1, 2)));
+      Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(1, 2), Resource.newInstance(1, 1)));
+      Assert.assertFalse(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(2, 1), Resource.newInstance(1, 2)));
+    } else if (resourceCalculator instanceof DominantResourceCalculator) {
+      Assert.assertFalse(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(1, 2), Resource.newInstance(2, 1)));
+      Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(1, 2), Resource.newInstance(2, 2)));
+      Assert.assertTrue(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(1, 2), Resource.newInstance(1, 2)));
+      Assert.assertFalse(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(1, 2), Resource.newInstance(1, 1)));
+      Assert.assertFalse(resourceCalculator.fitsIn(cluster,
+          Resource.newInstance(2, 1), Resource.newInstance(1, 2)));
+    }
+  }
 
   @Test(timeout = 10000)
   public void testResourceCalculatorCompareMethod() {
@@ -92,7 +121,6 @@ public class TestResourceCalculator {
 
   }
 
-
   private void assertResourcesOperations(Resource clusterResource,
       Resource lhs, Resource rhs, boolean lessThan, boolean lessThanOrEqual,
       boolean greaterThan, boolean greaterThanOrEqual, Resource max,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 38fbc82..c0ccf57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -19,12 +19,13 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 
@@ -73,5 +74,5 @@ public interface NodeHeartbeatResponse {
   void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
 
   List<Container> getContainersToDecrease();
-  void addAllContainersToDecrease(List<Container> containersToDecrease);
+  void addAllContainersToDecrease(Collection<Container> containersToDecrease);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 12c5230..dc65141 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -20,14 +20,15 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
@@ -437,7 +438,7 @@ public class NodeHeartbeatResponsePBImpl extends
 
   @Override
   public void addAllContainersToDecrease(
-      final List<Container> containersToDecrease) {
+      final Collection<Container> containersToDecrease) {
     if (containersToDecrease == null) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 14142de..87c7bfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -451,11 +451,13 @@ public class ApplicationMasterService extends AbstractService implements
           req.setNodeLabelExpression(asc.getNodeLabelExpression());
         }
       }
+      
+      Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
               
       // sanity check
       try {
         RMServerUtils.normalizeAndValidateRequests(ask,
-            rScheduler.getMaximumResourceCapability(), app.getQueue(),
+            maximumCapacity, app.getQueue(),
             rScheduler, rmContext);
       } catch (InvalidResourceRequestException e) {
         LOG.warn("Invalid resource ask by application " + appAttemptId, e);
@@ -469,6 +471,15 @@ public class ApplicationMasterService extends AbstractService implements
         throw e;
       }
 
+      try {
+        RMServerUtils.increaseDecreaseRequestSanityCheck(rmContext,
+            request.getIncreaseRequests(), request.getDecreaseRequests(),
+            maximumCapacity);
+      } catch (InvalidResourceRequestException e) {
+        LOG.warn(e);
+        throw e;
+      }
+
       // In the case of work-preserving AM restart, it's possible for the
       // AM to release containers from the earlier attempt.
       if (!app.getApplicationSubmissionContext()
@@ -493,8 +504,9 @@ public class ApplicationMasterService extends AbstractService implements
         allocation = EMPTY_ALLOCATION;
       } else {
         allocation =
-          this.rScheduler.allocate(appAttemptId, ask, release,
-              blacklistAdditions, blacklistRemovals);
+            this.rScheduler.allocate(appAttemptId, ask, release,
+                blacklistAdditions, blacklistRemovals,
+                request.getIncreaseRequests(), request.getDecreaseRequests());
       }
 
       if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
@@ -540,6 +552,10 @@ public class ApplicationMasterService extends AbstractService implements
           .pullJustFinishedContainers());
       allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
       allocateResponse.setAvailableResources(allocation.getResourceLimit());
+      
+      // Handling increased/decreased containers
+      allocateResponse.setIncreasedContainers(allocation.getIncreasedContainers());
+      allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers());
 
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
index f049d97..cd9a61d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
@@ -56,6 +56,8 @@ public class RMAuditLogger {
     public static final String RELEASE_CONTAINER = "AM Released Container";
     public static final String UPDATE_APP_PRIORITY =
         "Update Application Priority Request";
+    public static final String CHANGE_CONTAINER_RESOURCE =
+        "AM Changed Container Resource";
 
     // Some commonly used descriptions
     public static final String UNAUTHORIZED_USER = "Unauthorized user";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 4d2e41c..cc30593 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +36,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -49,10 +52,14 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -107,6 +114,89 @@ public class RMServerUtils {
           queueName, scheduler, rmContext, queueInfo);
     }
   }
+  
+  /**
+   * Normalize container increase/decrease request, it will normalize and update
+   * ContainerResourceChangeRequest.targetResource
+   * 
+   * <pre>
+   * - Throw exception when any other error happens
+   * </pre>
+   */
+  public static void checkAndNormalizeContainerChangeRequest(
+      RMContext rmContext, ContainerResourceChangeRequest request,
+      boolean increase) throws InvalidResourceRequestException {
+    ContainerId containerId = request.getContainerId();
+    ResourceScheduler scheduler = rmContext.getScheduler();
+    RMContainer rmContainer = scheduler.getRMContainer(containerId);
+    ResourceCalculator rc = scheduler.getResourceCalculator();
+    
+    if (null == rmContainer) {
+      String msg =
+          "Failed to get rmContainer for "
+              + (increase ? "increase" : "decrease")
+              + " request, with container-id=" + containerId;
+      throw new InvalidResourceRequestException(msg);
+    }
+
+    if (rmContainer.getState() != RMContainerState.RUNNING) {
+      String msg =
+          "rmContainer's state is not RUNNING, for "
+              + (increase ? "increase" : "decrease")
+              + " request, with container-id=" + containerId;
+      throw new InvalidResourceRequestException(msg);
+    }
+
+    Resource targetResource = Resources.normalize(rc, request.getCapability(),
+        scheduler.getMinimumResourceCapability(),
+        scheduler.getMaximumResourceCapability(),
+        scheduler.getMinimumResourceCapability());
+
+    // Compare targetResource and original resource
+    Resource originalResource = rmContainer.getAllocatedResource();
+
+    // Resource comparasion should be >= (or <=) for all resource vectors, for
+    // example, you cannot request target resource of a <10G, 10> container to
+    // <20G, 8>
+    if (increase) {
+      if (originalResource.getMemory() > targetResource.getMemory()
+          || originalResource.getVirtualCores() > targetResource
+              .getVirtualCores()) {
+        String msg =
+            "Trying to increase a container, but target resource has some"
+                + " resource < original resource, target=" + targetResource
+                + " original=" + originalResource + " containerId="
+                + containerId;
+        throw new InvalidResourceRequestException(msg);
+      }
+    } else {
+      if (originalResource.getMemory() < targetResource.getMemory()
+          || originalResource.getVirtualCores() < targetResource
+              .getVirtualCores()) {
+        String msg =
+            "Trying to decrease a container, but target resource has "
+                + "some resource > original resource, target=" + targetResource
+                + " original=" + originalResource + " containerId="
+                + containerId;
+        throw new InvalidResourceRequestException(msg);
+      }
+    }
+    
+    RMNode rmNode = rmContext.getRMNodes().get(rmContainer.getAllocatedNode());
+    
+    // Target resource of the increase request is more than NM can offer
+    if (!Resources.fitsIn(scheduler.getResourceCalculator(),
+        scheduler.getClusterResource(), targetResource,
+        rmNode.getTotalCapability())) {
+      String msg = "Target resource=" + targetResource + " of containerId="
+          + containerId + " is more than node's total resource="
+          + rmNode.getTotalCapability();
+      throw new InvalidResourceRequestException(msg);
+    }
+
+    // Update normalized target resource
+    request.setCapability(targetResource);
+  }
 
   /*
    * @throw <code>InvalidResourceBlacklistRequestException </code> if the
@@ -123,6 +213,80 @@ public class RMServerUtils {
       }
     }
   }
+  
+  /**
+   * Check if we have:
+   * - Request for same containerId and different target resource
+   * - If targetResources violates maximum/minimumAllocation
+   */
+  public static void increaseDecreaseRequestSanityCheck(RMContext rmContext,
+      List<ContainerResourceChangeRequest> incRequests,
+      List<ContainerResourceChangeRequest> decRequests,
+      Resource maximumAllocation) throws InvalidResourceRequestException {
+    checkDuplicatedIncreaseDecreaseRequest(incRequests, decRequests);
+    validateIncreaseDecreaseRequest(rmContext, incRequests, maximumAllocation,
+        true);
+    validateIncreaseDecreaseRequest(rmContext, decRequests, maximumAllocation,
+        false);
+  }
+  
+  private static void checkDuplicatedIncreaseDecreaseRequest(
+      List<ContainerResourceChangeRequest> incRequests,
+      List<ContainerResourceChangeRequest> decRequests)
+          throws InvalidResourceRequestException {
+    String msg = "There're multiple increase or decrease container requests "
+        + "for same containerId=";
+    Set<ContainerId> existedContainerIds = new HashSet<ContainerId>();
+    if (incRequests != null) {
+      for (ContainerResourceChangeRequest r : incRequests) {
+        if (!existedContainerIds.add(r.getContainerId())) {
+          throw new InvalidResourceRequestException(msg + r.getContainerId());
+        }
+      }
+    }
+    
+    if (decRequests != null) {
+      for (ContainerResourceChangeRequest r : decRequests) {
+        if (!existedContainerIds.add(r.getContainerId())) {
+          throw new InvalidResourceRequestException(msg + r.getContainerId());
+        }
+      }
+    }
+  }
+  
+  private static void validateIncreaseDecreaseRequest(RMContext rmContext,
+      List<ContainerResourceChangeRequest> requests, Resource maximumAllocation,
+      boolean increase)
+      throws InvalidResourceRequestException {
+    if (requests == null) {
+      return;
+    }
+    for (ContainerResourceChangeRequest request : requests) {
+      if (request.getCapability().getMemory() < 0
+          || request.getCapability().getMemory() > maximumAllocation
+              .getMemory()) {
+        throw new InvalidResourceRequestException("Invalid "
+            + (increase ? "increase" : "decrease") + " request"
+            + ", requested memory < 0"
+            + ", or requested memory > max configured" + ", requestedMemory="
+            + request.getCapability().getMemory() + ", maxMemory="
+            + maximumAllocation.getMemory());
+      }
+      if (request.getCapability().getVirtualCores() < 0
+          || request.getCapability().getVirtualCores() > maximumAllocation
+              .getVirtualCores()) {
+        throw new InvalidResourceRequestException("Invalid "
+            + (increase ? "increase" : "decrease") + " request"
+            + ", requested virtual cores < 0"
+            + ", or requested virtual cores > max configured"
+            + ", requestedVirtualCores="
+            + request.getCapability().getVirtualCores() + ", maxVirtualCores="
+            + maximumAllocation.getVirtualCores());
+      }
+      
+      checkAndNormalizeContainerChangeRequest(rmContext, request, increase);
+    }
+  }
 
   /**
    * It will validate to make sure all the containers belong to correct

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 100e991..557f6d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -449,6 +449,8 @@ public class ResourceTrackerService extends AbstractService implements
             getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
             nextHeartBeatInterval);
     rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
+    rmNode.updateNodeHeartbeatResponseForContainersDecreasing(
+        nodeHeartBeatResponse);
 
     populateKeys(request, nodeHeartBeatResponse);
 
@@ -461,8 +463,9 @@ public class ResourceTrackerService extends AbstractService implements
     // 4. Send status to RMNode, saving the latest response.
     RMNodeStatusEvent nodeStatusEvent =
         new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
-          remoteNodeStatus.getContainersStatuses(),
-          remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse);
+            remoteNodeStatus.getContainersStatuses(),
+            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse,
+            remoteNodeStatus.getIncreasedContainers());
     if (request.getLogAggregationReportsForApps() != null
         && !request.getLogAggregationReportsForApps().isEmpty()) {
       nodeStatusEvent.setLogAggregationReportsForApps(request

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 629b2a3..43de3ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -971,7 +971,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
                 Collections.singletonList(appAttempt.amReq),
                 EMPTY_CONTAINER_RELEASE_LIST,
                 amBlacklist.getAdditions(),
-                amBlacklist.getRemovals());
+                amBlacklist.getRemovals(), null, null);
         if (amContainerAllocation != null
             && amContainerAllocation.getContainers() != null) {
           assert (amContainerAllocation.getContainers().size() == 0);
@@ -995,7 +995,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       Allocation amContainerAllocation =
           appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
             EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
-            null);
+            null, null, null);
       // There must be at least one container allocated, because a
       // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
       // and is put in SchedulerApplication#newlyAllocatedContainers.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 21d79ee..dc0d9ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -82,4 +82,8 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   String getNodeHttpAddress();
   
   String getNodeLabelExpression();
+  
+  boolean hasIncreaseReservation();
+  
+  void cancelIncreaseReservation();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
new file mode 100644
index 0000000..920cfdb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class RMContainerChangeResourceEvent extends RMContainerEvent {
+  
+  final Resource targetResource;
+  final boolean increase;
+
+  public RMContainerChangeResourceEvent(ContainerId containerId,
+      Resource targetResource, boolean increase) {
+    super(containerId, RMContainerEventType.CHANGE_RESOURCE);
+
+    this.targetResource = targetResource;
+    this.increase = increase;
+  }
+  
+  public Resource getTargetResource() {
+    return targetResource;
+  }
+  
+  public boolean isIncrease() {
+    return increase;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
index 259d68b3..a3b4b76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
@@ -25,6 +25,10 @@ public enum RMContainerEventType {
   ACQUIRED,
   KILL, // Also from Node on NodeRemoval
   RESERVED,
+  
+  // when a container acquired by AM after
+  // it increased/decreased
+  ACQUIRE_UPDATED_CONTAINER, 
 
   LAUNCHED,
   FINISHED,
@@ -35,5 +39,12 @@ public enum RMContainerEventType {
   // Source: ContainerAllocationExpirer  
   EXPIRE,
 
-  RECOVER
+  RECOVER,
+  
+  // Source: Scheduler
+  // Resource change approved by scheduler
+  CHANGE_RESOURCE,
+  
+  // NM reported resource change is done
+  NM_DONE_CHANGE_RESOURCE 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index a3d8bee..8133657 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -118,7 +118,18 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED,
         RMContainerEventType.RELEASED, new KillTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
-        RMContainerEventType.EXPIRE)
+        RMContainerEventType.RESERVED, new ContainerReservedTransition())
+    .addTransition(RMContainerState.RUNNING, RMContainerState.EXPIRED,
+        RMContainerEventType.EXPIRE,
+        new ContainerExpiredWhileRunningTransition())
+    .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
+        RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
+    .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
+        RMContainerEventType.ACQUIRE_UPDATED_CONTAINER, 
+        new ContainerAcquiredWhileRunningTransition())
+    .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
+        RMContainerEventType.NM_DONE_CHANGE_RESOURCE, 
+        new NMReportedContainerChangeIsDoneTransition())
 
     // Transitions from COMPLETED state
     .addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED,
@@ -140,9 +151,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
             RMContainerEventType.KILL, RMContainerEventType.FINISHED))
 
     // create the topology tables
-    .installTopology(); 
-                        
-                        
+    .installTopology();
 
   private final StateMachine<RMContainerState, RMContainerEventType,
                                                  RMContainerEvent> stateMachine;
@@ -166,6 +175,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   private ContainerStatus finishedStatus;
   private boolean isAMContainer;
   private List<ResourceRequest> resourceRequests;
+
+  private volatile boolean hasIncreaseReservation = false;
   
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -264,7 +275,12 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
 
   @Override
   public Resource getAllocatedResource() {
-    return container.getResource();
+    try {
+      readLock.lock();
+      return container.getResource();
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
@@ -471,8 +487,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     }
   }
 
-  private static final class ContainerReservedTransition extends
-  BaseTransition {
+  private static final class ContainerReservedTransition
+      extends BaseTransition {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
@@ -480,6 +496,12 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       container.reservedResource = e.getReservedResource();
       container.reservedNode = e.getReservedNode();
       container.reservedPriority = e.getReservedPriority();
+      
+      if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
+          .contains(container.getState())) {
+        // When container's state != NEW/RESERVED, it is an increase reservation
+        container.hasIncreaseReservation = true;
+      }
     }
   }
 
@@ -509,6 +531,70 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
           .getApplicationAttemptId().getApplicationId(), container.nodeId));
     }
   }
+  
+  private static final class ContainerAcquiredWhileRunningTransition extends
+      BaseTransition {
+
+    @Override
+    public void transition(RMContainerImpl container, RMContainerEvent event) {
+      RMContainerUpdatesAcquiredEvent acquiredEvent =
+          (RMContainerUpdatesAcquiredEvent) event;
+      if (acquiredEvent.isIncreasedContainer()) {
+        // If container is increased but not acquired by AM, we will start
+        // containerAllocationExpirer for this container in this transition. 
+        container.containerAllocationExpirer.register(event.getContainerId());
+      }
+    }
+  }
+  
+  private static final class NMReportedContainerChangeIsDoneTransition
+      extends BaseTransition {
+
+    @Override
+    public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Unregister the allocation expirer, it is already increased..
+      container.containerAllocationExpirer.unregister(event.getContainerId());
+    }
+  }
+  
+  private static final class ContainerExpiredWhileRunningTransition extends
+      BaseTransition {
+
+    @Override
+    public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // When the container expired, and it has a pending increased request, we
+      // will kill the container.
+      // TODO, we can do better for this: roll back container resource to the
+      // resource before increase, and notify scheduler about this decrease as
+      // well. Will do that in a separated JIRA.
+      new KillTransition().transition(container, event);
+    }
+  }
+  
+  private static final class ChangeResourceTransition extends BaseTransition {
+
+    @Override
+    public void transition(RMContainerImpl container, RMContainerEvent event) {
+      RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
+      
+      // Register with containerAllocationExpirer.
+      // For now, we assume timeout for increase is as same as container
+      // allocation.
+      if (!changeEvent.isIncrease()) {
+        // if this is a decrease request, if container was increased but not
+        // told to NM, we can consider previous increase is cancelled,
+        // unregister from the containerAllocationExpirer
+        container.containerAllocationExpirer.unregister(container
+            .getContainerId());
+      }
+      
+      container.container.setResource(changeEvent.getTargetResource());
+      
+      // We reach here means we either allocated increase reservation OR
+      // decreased container, reservation will be cancelled anyway. 
+      container.hasIncreaseReservation = false;
+    }
+  }
 
   private static final class ContainerRescheduledTransition extends
       FinishedTransition {
@@ -561,13 +647,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       RMAppAttempt rmAttempt = container.rmContext.getRMApps()
           .get(container.getApplicationAttemptId().getApplicationId())
           .getCurrentAppAttempt();
-      if (ContainerExitStatus.PREEMPTED == container.finishedStatus
-        .getExitStatus()) {
-        rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
-          container);
-      }
 
       if (rmAttempt != null) {
+        if (ContainerExitStatus.PREEMPTED == container.finishedStatus
+            .getExitStatus()) {
+            rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
+              container);
+          }
+        
         long usedMillis = container.finishTime - container.creationTime;
         long memorySeconds = resource.getMemory()
                               * usedMillis / DateUtils.MILLIS_PER_SECOND;
@@ -665,4 +752,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     }
     return -1;
   }
+
+  @Override
+  public boolean hasIncreaseReservation() {
+    return hasIncreaseReservation;
+  }
+
+  @Override
+  public void cancelIncreaseReservation() {
+    hasIncreaseReservation = false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java
new file mode 100644
index 0000000..0dccc5f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class RMContainerUpdatesAcquiredEvent extends RMContainerEvent  {
+  private final boolean increasedContainer;
+  
+  public RMContainerUpdatesAcquiredEvent(ContainerId containerId,
+      boolean increasedContainer) {
+    super(containerId, RMContainerEventType.ACQUIRE_UPDATED_CONTAINER);
+    this.increasedContainer = increasedContainer; 
+  }
+  
+  public boolean isIncreasedContainer() {
+    return increasedContainer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 6bb0971..f28422a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -146,4 +147,12 @@ public interface RMNode {
    * @return labels in this node
    */
   public Set<String> getNodeLabels();
+  
+  /**
+   * Update containers to be decreased
+   */
+  public void updateNodeHeartbeatResponseForContainersDecreasing(
+      NodeHeartbeatResponse response);
+  
+  public List<Container> pullNewlyIncreasedContainers();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java
new file mode 100644
index 0000000..62925ad
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class RMNodeDecreaseContainerEvent extends RMNodeEvent {
+  final List<Container> toBeDecreasedContainers;
+
+  public RMNodeDecreaseContainerEvent(NodeId nodeId,
+      List<Container> toBeDecreasedContainers) {
+    super(nodeId, RMNodeEventType.DECREASE_CONTAINER);
+    
+    this.toBeDecreasedContainers = toBeDecreasedContainers;
+  }
+  
+  public List<Container> getToBeDecreasedContainers() {
+    return toBeDecreasedContainers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index 27ba1c0..a68c894 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -42,6 +42,7 @@ public enum RMNodeEventType {
   // Source: Container
   CONTAINER_ALLOCATED,
   CLEANUP_CONTAINER,
+  DECREASE_CONTAINER,
 
   // Source: RMAppAttempt
   FINISHED_CONTAINERS_PULLED_BY_AM,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 7a1ba74..7a43598 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -19,9 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -36,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -131,6 +136,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   /* the list of applications that are running on this node */
   private final List<ApplicationId> runningApplications =
       new ArrayList<ApplicationId>();
+  
+  private final Map<ContainerId, Container> toBeDecreasedContainers =
+      new HashMap<>();
+  
+  private final Map<ContainerId, Container> nmReportedIncreasedContainers =
+      new HashMap<>();
 
   private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
       .newRecordInstance(NodeHeartbeatResponse.class);
@@ -178,6 +189,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
      .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
          RMNodeEventType.SHUTDOWN,
          new DeactivateNodeTransition(NodeState.SHUTDOWN))
+     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+         RMNodeEventType.DECREASE_CONTAINER,
+         new DecreaseContainersTransition())
 
      //Transitions from REBOOTED state
      .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
@@ -430,6 +444,24 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       this.writeLock.unlock();
     }
   };
+  
+  @VisibleForTesting
+  public Collection<Container> getToBeDecreasedContainers() {
+    return toBeDecreasedContainers.values(); 
+  }
+  
+  @Override
+  public void updateNodeHeartbeatResponseForContainersDecreasing(
+      NodeHeartbeatResponse response) {
+    this.writeLock.lock();
+    
+    try {
+      response.addAllContainersToDecrease(toBeDecreasedContainers.values());
+      toBeDecreasedContainers.clear();
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 
   @Override
   public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
@@ -759,6 +791,19 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
     }
   }
+  
+  public static class DecreaseContainersTransition
+      implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+ 
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event;
+
+      for (Container c : de.getToBeDecreasedContainers()) {
+        rmNode.toBeDecreasedContainers.put(c.getId(), c);
+      }
+    }
+  }
 
   public static class DeactivateNodeTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@@ -827,6 +872,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       }
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
+      rmNode.handleReportedIncreasedContainers(
+          statusEvent.getNMReportedIncreasedContainers());
 
       List<LogAggregationReport> logAggregationReportsForApps =
           statusEvent.getLogAggregationReportsForApps();
@@ -919,6 +966,34 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
     return nlm.getLabelsOnNode(nodeId);
   }
+  
+  private void handleReportedIncreasedContainers(
+      List<Container> reportedIncreasedContainers) {
+    for (Container container : reportedIncreasedContainers) {
+      ContainerId containerId = container.getId();
+
+      // Don't bother with containers already scheduled for cleanup, or for
+      // applications already killed. The scheduler doens't need to know any
+      // more about this container
+      if (containersToClean.contains(containerId)) {
+        LOG.info("Container " + containerId + " already scheduled for "
+            + "cleanup, no further processing");
+        continue;
+      }
+
+      ApplicationId containerAppId =
+          containerId.getApplicationAttemptId().getApplicationId();
+
+      if (finishedApplications.contains(containerAppId)) {
+        LOG.info("Container " + containerId
+            + " belongs to an application that is already killed,"
+            + " no further processing");
+        continue;
+      }
+      
+      this.nmReportedIncreasedContainers.put(containerId, container);
+    }
+  }
 
   private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
     // Filter the map to only obtain just launched containers and finished
@@ -989,4 +1064,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
+  @Override
+  public List<Container> pullNewlyIncreasedContainers() {
+    try {
+      writeLock.lock();
+
+      if (nmReportedIncreasedContainers.isEmpty()) {
+        return Collections.EMPTY_LIST;
+      } else {
+        List<Container> container =
+            new ArrayList<Container>(nmReportedIncreasedContainers.values());
+        nmReportedIncreasedContainers.clear();
+        return container;
+      }
+      
+    } finally {
+      writeLock.unlock();
+    }
+   }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index b95d7d3..8323f3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -18,8 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
+import java.util.Collections;
 import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -33,28 +36,36 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   private final NodeHeartbeatResponse latestResponse;
   private final List<ApplicationId> keepAliveAppIds;
   private List<LogAggregationReport> logAggregationReportsForApps;
-
+  private final List<Container> nmReportedIncreasedContainers;
+  
+  // Used by tests
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
       NodeHeartbeatResponse latestResponse) {
-    super(nodeId, RMNodeEventType.STATUS_UPDATE);
-    this.nodeHealthStatus = nodeHealthStatus;
-    this.containersCollection = collection;
-    this.keepAliveAppIds = keepAliveAppIds;
-    this.latestResponse = latestResponse;
-    this.logAggregationReportsForApps = null;
+    this(nodeId, nodeHealthStatus, collection, keepAliveAppIds,
+        latestResponse, null);
   }
 
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
       NodeHeartbeatResponse latestResponse,
-      List<LogAggregationReport> logAggregationReportsForApps) {
+      List<Container> nmReportedIncreasedContainers) {
+    this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, latestResponse,
+        null, nmReportedIncreasedContainers);
+  }
+
+  public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
+      List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
+      NodeHeartbeatResponse latestResponse,
+      List<LogAggregationReport> logAggregationReportsForApps,
+      List<Container> nmReportedIncreasedContainers) {
     super(nodeId, RMNodeEventType.STATUS_UPDATE);
     this.nodeHealthStatus = nodeHealthStatus;
     this.containersCollection = collection;
     this.keepAliveAppIds = keepAliveAppIds;
     this.latestResponse = latestResponse;
     this.logAggregationReportsForApps = logAggregationReportsForApps;
+    this.nmReportedIncreasedContainers = nmReportedIncreasedContainers;
   }
 
   public NodeHealthStatus getNodeHealthStatus() {
@@ -81,4 +92,9 @@ public class RMNodeStatusEvent extends RMNodeEvent {
       List<LogAggregationReport> logAggregationReportsForApps) {
     this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
+  
+  public List<Container> getNMReportedIncreasedContainers() {
+    return nmReportedIncreasedContainers == null ? Collections.EMPTY_LIST
+        : nmReportedIncreasedContainers;
+  }
 }
\ No newline at end of file