You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 09:00:19 UTC

[45/50] hadoop git commit: YARN-4138. Roll back container resource allocation after resource increase token expires. Contributed by Meng Ding

YARN-4138. Roll back container resource allocation after resource increase token expires. Contributed by Meng Ding


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d16b17b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d16b17b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d16b17b4

Branch: refs/heads/yarn-2877
Commit: d16b17b4d299b4d58f879a2a15708bacd0938685
Parents: aeb13ef
Author: Jian He <ji...@apache.org>
Authored: Thu Feb 11 10:06:27 2016 +0800
Committer: Jian He <ji...@apache.org>
Committed: Thu Feb 11 10:06:27 2016 +0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   4 +
 .../rmcontainer/AllocationExpirationInfo.java   |  75 ++++
 .../rmcontainer/ContainerAllocationExpirer.java |  11 +-
 .../rmcontainer/RMContainer.java                |   2 +
 .../rmcontainer/RMContainerImpl.java            | 132 ++++--
 .../RMContainerNMDoneChangeResourceEvent.java   |  37 ++
 .../resourcemanager/rmnode/RMNodeImpl.java      |   7 +-
 .../scheduler/AbstractYarnScheduler.java        |  56 +--
 .../scheduler/capacity/CapacityScheduler.java   |  43 +-
 .../event/ContainerExpiredSchedulerEvent.java   |  12 +-
 .../yarn/server/resourcemanager/MockNM.java     |  20 +
 .../resourcemanager/TestRMNodeTransitions.java  |  18 +-
 .../rmcontainer/TestRMContainerImpl.java        | 112 +----
 .../capacity/TestContainerResizing.java         |  44 +-
 .../capacity/TestIncreaseAllocationExpirer.java | 443 +++++++++++++++++++
 .../scheduler/capacity/TestUtils.java           |  15 +-
 16 files changed, 804 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index fffa83f..5c819f5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -804,6 +804,10 @@ Release 2.8.0 - UNRELEASED
 
     YARN-4420. Add REST API for List Reservations. (Sean Po via curino)
 
+    YARN-4138. Roll back container resource allocation after resource
+    increase token expires. (Meng Ding via jianhe)
+
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java
new file mode 100644
index 0000000..f4fc72a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AllocationExpirationInfo implements
+    Comparable<AllocationExpirationInfo> {
+
+  private final ContainerId containerId;
+  private final boolean increase;
+
+  public AllocationExpirationInfo(ContainerId containerId) {
+    this(containerId, false);
+  }
+
+  public AllocationExpirationInfo(
+      ContainerId containerId, boolean increase) {
+    this.containerId = containerId;
+    this.increase = increase;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+  public boolean isIncrease() {
+    return this.increase;
+  }
+
+  @Override
+  public int hashCode() {
+    return (getContainerId().hashCode() << 16);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof AllocationExpirationInfo)) {
+      return false;
+    }
+    return compareTo((AllocationExpirationInfo)other) == 0;
+  }
+
+  @Override
+  public int compareTo(AllocationExpirationInfo other) {
+    if (other == null) {
+      return -1;
+    }
+    // Only need to compare containerId.
+    return getContainerId().compareTo(other.getContainerId());
+  }
+
+  @Override
+  public String toString() {
+    return "<container=" + getContainerId() + ", increase="
+        + isIncrease() + ">";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java
index c393f4e..d8198f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -28,7 +27,7 @@ import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class ContainerAllocationExpirer extends
-    AbstractLivelinessMonitor<ContainerId> {
+    AbstractLivelinessMonitor<AllocationExpirationInfo> {
 
   private EventHandler dispatcher;
 
@@ -47,7 +46,9 @@ public class ContainerAllocationExpirer extends
   }
 
   @Override
-  protected void expire(ContainerId containerId) {
-    dispatcher.handle(new ContainerExpiredSchedulerEvent(containerId));
+  protected void expire(AllocationExpirationInfo allocationExpirationInfo) {
+    dispatcher.handle(new ContainerExpiredSchedulerEvent(
+        allocationExpirationInfo.getContainerId(),
+            allocationExpirationInfo.isIncrease()));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index dc0d9ba..5d26931 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -57,6 +57,8 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
 
   Resource getAllocatedResource();
 
+  Resource getLastConfirmedResource();
+
   NodeId getAllocatedNode();
 
   Priority getAllocatedPriority();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 83876d0..16ab55d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -49,12 +50,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+    .RMNodeDecreaseContainerEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
@@ -119,9 +123,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
         RMContainerEventType.RELEASED, new KillTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
-    .addTransition(RMContainerState.RUNNING, RMContainerState.EXPIRED,
-        RMContainerEventType.EXPIRE,
-        new ContainerExpiredWhileRunningTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
         RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
@@ -177,7 +178,10 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   private List<ResourceRequest> resourceRequests;
 
   private volatile boolean hasIncreaseReservation = false;
-  
+  // Only used for container resource increase and decrease. This is the
+  // resource to rollback to should container resource increase token expires.
+  private Resource lastConfirmedResource;
+
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext) {
@@ -210,6 +214,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     this.isAMContainer = false;
     this.resourceRequests = null;
     this.nodeLabelExpression = nodeLabelExpression;
+    this.lastConfirmedResource = container.getResource();
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
@@ -284,6 +289,16 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   }
 
   @Override
+  public Resource getLastConfirmedResource() {
+    try {
+      readLock.lock();
+      return this.lastConfirmedResource;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
   public NodeId getAllocatedNode() {
     return container.getNodeId();
   }
@@ -525,7 +540,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       container.setResourceRequests(null);
       
       // Register with containerAllocationExpirer.
-      container.containerAllocationExpirer.register(container.getContainerId());
+      container.containerAllocationExpirer.register(
+          new AllocationExpirationInfo(container.getContainerId()));
 
       // Tell the app
       container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
@@ -543,7 +559,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       if (acquiredEvent.isIncreasedContainer()) {
         // If container is increased but not acquired by AM, we will start
         // containerAllocationExpirer for this container in this transition. 
-        container.containerAllocationExpirer.register(event.getContainerId());
+        container.containerAllocationExpirer.register(
+            new AllocationExpirationInfo(event.getContainerId(), true));
       }
     }
   }
@@ -553,22 +570,65 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
-      // Unregister the allocation expirer, it is already increased..
-      container.containerAllocationExpirer.unregister(event.getContainerId());
-    }
-  }
-  
-  private static final class ContainerExpiredWhileRunningTransition extends
-      BaseTransition {
+      RMContainerNMDoneChangeResourceEvent nmDoneChangeResourceEvent =
+          (RMContainerNMDoneChangeResourceEvent)event;
+      Resource rmContainerResource = container.getAllocatedResource();
+      Resource nmContainerResource =
+          nmDoneChangeResourceEvent.getNMContainerResource();
+
+      if (Resources.equals(rmContainerResource, nmContainerResource)) {
+        // If rmContainerResource == nmContainerResource, the resource
+        // increase is confirmed.
+        // In this case:
+        //    - Set the lastConfirmedResource as nmContainerResource
+        //    - Unregister the allocation expirer
+        container.lastConfirmedResource = nmContainerResource;
+        container.containerAllocationExpirer.unregister(
+            new AllocationExpirationInfo(event.getContainerId()));
+      } else if (Resources.fitsIn(rmContainerResource, nmContainerResource)) {
+        // If rmContainerResource < nmContainerResource, this is caused by the
+        // following sequence:
+        //   1. AM asks for increase from 1G to 5G, and RM approves it
+        //   2. AM acquires the increase token and increases on NM
+        //   3. Before NM reports 5G to RM to confirm the increase, AM sends
+        //      a decrease request to 4G, and RM approves it
+        //   4. When NM reports 5G to RM, RM now sees its own allocation as 4G
+        // In this cases:
+        //    - Set the lastConfirmedResource as rmContainerResource
+        //    - Unregister the allocation expirer
+        //    - Notify NM to reduce its resource to rmContainerResource
+        container.lastConfirmedResource = rmContainerResource;
+        container.containerAllocationExpirer.unregister(
+            new AllocationExpirationInfo(event.getContainerId()));
+        container.eventHandler.handle(new RMNodeDecreaseContainerEvent(
+            container.nodeId,
+            Collections.singletonList(container.getContainer())));
+      } else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {
+        // If nmContainerResource < rmContainerResource, this is caused by the
+        // following sequence:
+        //    1. AM asks for increase from 1G to 2G, and RM approves it
+        //    2. AM asks for increase from 2G to 4G, and RM approves it
+        //    3. AM only uses the 2G token to increase on NM, but never uses the
+        //       4G token
+        //    4. NM reports 2G to RM, but RM sees its own allocation as 4G
+        // In this case:
+        //    - Set the lastConfirmedResource as the maximum of
+        //      nmContainerResource and lastConfirmedResource
+        //    - Do NOT unregister the allocation expirer
+        // When the increase allocation expires, resource will be rolled back to
+        // the last confirmed resource.
+        container.lastConfirmedResource = Resources.componentwiseMax(
+            nmContainerResource, container.lastConfirmedResource);
+      } else {
+        // Something wrong happened, kill the container
+        LOG.warn("Something wrong happened, container size reported by NM"
+            + " is not expected, ContainerID=" + container.containerId
+            + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
+            + nmContainerResource);
+        container.eventHandler.handle(new RMNodeCleanContainerEvent(
+            container.nodeId, container.containerId));
 
-    @Override
-    public void transition(RMContainerImpl container, RMContainerEvent event) {
-      // When the container expired, and it has a pending increased request, we
-      // will kill the container.
-      // TODO, we can do better for this: roll back container resource to the
-      // resource before increase, and notify scheduler about this decrease as
-      // well. Will do that in a separated JIRA.
-      new KillTransition().transition(container, event);
+      }
     }
   }
   
@@ -577,20 +637,22 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
       RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
-      
-      // Register with containerAllocationExpirer.
-      // For now, we assume timeout for increase is as same as container
-      // allocation.
+
+      Resource targetResource = changeEvent.getTargetResource();
+      Resource lastConfirmedResource = container.lastConfirmedResource;
+
       if (!changeEvent.isIncrease()) {
-        // if this is a decrease request, if container was increased but not
-        // told to NM, we can consider previous increase is cancelled,
-        // unregister from the containerAllocationExpirer
-        container.containerAllocationExpirer.unregister(container
-            .getContainerId());
+        // Only unregister from the containerAllocationExpirer when target
+        // resource is less than or equal to the last confirmed resource.
+        if (Resources.fitsIn(targetResource, lastConfirmedResource)) {
+          container.lastConfirmedResource = targetResource;
+          container.containerAllocationExpirer.unregister(
+              new AllocationExpirationInfo(event.getContainerId()));
+        }
       }
-      
-      container.container.setResource(changeEvent.getTargetResource());
-      
+
+      container.container.setResource(targetResource);
+
       // We reach here means we either allocated increase reservation OR
       // decreased container, reservation will be cancelled anyway. 
       container.hasIncreaseReservation = false;
@@ -662,8 +724,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
 
       // Unregister from containerAllocationExpirer.
-      container.containerAllocationExpirer.unregister(container
-          .getContainerId());
+      container.containerAllocationExpirer.unregister(
+          new AllocationExpirationInfo(container.getContainerId()));
 
       // Inform node
       container.eventHandler.handle(new RMNodeCleanContainerEvent(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerNMDoneChangeResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerNMDoneChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerNMDoneChangeResourceEvent.java
new file mode 100644
index 0000000..362c8ea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerNMDoneChangeResourceEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class RMContainerNMDoneChangeResourceEvent extends RMContainerEvent {
+
+  private final Resource nmContainerResource;
+
+  public RMContainerNMDoneChangeResourceEvent(
+      ContainerId containerId, Resource nmContainerResource) {
+    super(containerId, RMContainerEventType.NM_DONE_CHANGE_RESOURCE);
+    this.nmContainerResource = nmContainerResource;
+  }
+
+  public Resource getNMContainerResource() {
+    return nmContainerResource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 433e189..f4e483b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1306,14 +1307,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           launchedContainers.add(containerId);
           newlyLaunchedContainers.add(remoteContainer);
           // Unregister from containerAllocationExpirer.
-          containerAllocationExpirer.unregister(containerId);
+          containerAllocationExpirer.unregister(
+              new AllocationExpirationInfo(containerId));
         }
       } else {
         // A finished container
         launchedContainers.remove(containerId);
         completedContainers.add(remoteContainer);
         // Unregister from containerAllocationExpirer.
-        containerAllocationExpirer.unregister(containerId);
+        containerAllocationExpirer.unregister(
+            new AllocationExpirationInfo(containerId));
       }
     }
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 27d4f91..7ca8671 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
@@ -68,18 +67,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
+    .RMContainerNMDoneChangeResourceEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
-
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.util.resource.Resources;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -260,7 +259,7 @@ public abstract class AbstractYarnScheduler
     application.containerLaunchedOnNode(containerId, node.getNodeID());
   }
   
-  protected synchronized void containerIncreasedOnNode(ContainerId containerId,
+  protected void containerIncreasedOnNode(ContainerId containerId,
       SchedulerNode node, Container increasedContainerReportedByNM) {
     // Get the application for the finished container
     SchedulerApplicationAttempt application =
@@ -273,39 +272,18 @@ public abstract class AbstractYarnScheduler
           .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
       return;
     }
-
-    RMContainer rmContainer = getRMContainer(containerId);
-    Resource rmContainerResource = rmContainer.getAllocatedResource();
-    Resource nmContainerResource = increasedContainerReportedByNM.getResource();
-    
-    
-    if (Resources.equals(nmContainerResource, rmContainerResource)){
-      // NM reported expected container size, tell RMContainer. Which will stop
-      // container expire monitor
-      rmContainer.handle(new RMContainerEvent(containerId,
-          RMContainerEventType.NM_DONE_CHANGE_RESOURCE));
-    } else if (Resources.fitsIn(getResourceCalculator(), clusterResource,
-        nmContainerResource, rmContainerResource)) {
-      // when rmContainerResource >= nmContainerResource, we won't do anything,
-      // it is possible a container increased is issued by RM, but AM hasn't
-      // told NM.
-    } else if (Resources.fitsIn(getResourceCalculator(), clusterResource,
-        rmContainerResource, nmContainerResource)) {
-      // When rmContainerResource <= nmContainerResource, it could happen when a
-      // container decreased by RM before it is increased in NM.
-      
-      // Tell NM to decrease the container
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMNodeDecreaseContainerEvent(node.getNodeID(),
-              Arrays.asList(rmContainer.getContainer())));
-    } else {
-      // Something wrong happened, kill the container
-      LOG.warn("Something wrong happened, container size reported by NM"
-          + " is not expected, ContainerID=" + containerId
-          + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
-          + nmContainerResource);
-      this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+    LeafQueue leafQueue = (LeafQueue) application.getQueue();
+    synchronized (leafQueue) {
+      RMContainer rmContainer = getRMContainer(containerId);
+      if (rmContainer == null) {
+        // Some unknown container sneaked into the system. Kill it.
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMNodeCleanContainerEvent(
+                node.getNodeID(), containerId));
+        return;
+      }
+      rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(
+          containerId, increasedContainerReportedByNM.getResource()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index dcb60fc..ee3a3f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
@@ -1392,11 +1393,15 @@ public class CapacityScheduler extends
       ContainerExpiredSchedulerEvent containerExpiredEvent = 
           (ContainerExpiredSchedulerEvent) event;
       ContainerId containerId = containerExpiredEvent.getContainerId();
-      super.completedContainer(getRMContainer(containerId),
-          SchedulerUtils.createAbnormalContainerStatus(
-              containerId, 
-              SchedulerUtils.EXPIRED_CONTAINER), 
-          RMContainerEventType.EXPIRE);
+      if (containerExpiredEvent.isIncrease()) {
+        rollbackContainerResource(containerId);
+      } else {
+        completedContainer(getRMContainer(containerId),
+            SchedulerUtils.createAbnormalContainerStatus(
+                containerId,
+                SchedulerUtils.EXPIRED_CONTAINER),
+            RMContainerEventType.EXPIRE);
+      }
     }
     break;
     case KILL_RESERVED_CONTAINER:
@@ -1498,7 +1503,33 @@ public class CapacityScheduler extends
     LOG.info("Removed node " + nodeInfo.getNodeAddress() + 
         " clusterResource: " + clusterResource);
   }
-  
+
+  private void rollbackContainerResource(
+      ContainerId containerId) {
+    RMContainer rmContainer = getRMContainer(containerId);
+    if (rmContainer == null) {
+      LOG.info("Cannot rollback resource for container " + containerId +
+          ". The container does not exist.");
+      return;
+    }
+    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
+    if (application == null) {
+      LOG.info("Cannot rollback resource for container " + containerId +
+          ". The application that the container belongs to does not exist.");
+      return;
+    }
+    LOG.info("Roll back resource for container " + containerId);
+    LeafQueue leafQueue = (LeafQueue) application.getQueue();
+    synchronized(leafQueue) {
+      SchedulerNode schedulerNode =
+          getSchedulerNode(rmContainer.getAllocatedNode());
+      SchedContainerChangeRequest decreaseRequest =
+          new SchedContainerChangeRequest(this.rmContext, schedulerNode,
+              rmContainer, rmContainer.getLastConfirmedResource());
+      decreaseContainer(decreaseRequest, application);
+    }
+  }
+
   @Lock(CapacityScheduler.class)
   @Override
   protected synchronized void completedContainerInternal(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java
index 4a999c8..c80fc4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java
@@ -29,14 +29,24 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
 public class ContainerExpiredSchedulerEvent extends SchedulerEvent {
 
   private final ContainerId containerId;
-  
+  private final boolean increase;
+
   public ContainerExpiredSchedulerEvent(ContainerId containerId) {
+    this(containerId, false);
+  }
+
+  public ContainerExpiredSchedulerEvent(
+      ContainerId containerId, boolean increase) {
     super(SchedulerEventType.CONTAINER_EXPIRED);
     this.containerId = containerId;
+    this.increase = increase;
   }
 
   public ContainerId getContainerId() {
     return containerId;
   }
 
+  public boolean isIncrease() {
+    return increase;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 4233cd4..4407fe9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -103,6 +105,17 @@ public class MockNM {
     nodeHeartbeat(conts, true);
   }
 
+  public void containerIncreaseStatus(Container container) throws Exception {
+    Map<ApplicationId, List<ContainerStatus>> conts = new HashMap<>();
+    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+        container.getId(), ContainerState.RUNNING, "Success", 0,
+            container.getResource());
+    conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
+        Collections.singletonList(containerStatus));
+    List<Container> increasedConts = Collections.singletonList(container);
+    nodeHeartbeat(conts, increasedConts, true, ++responseId);
+  }
+
   public RegisterNodeManagerResponse registerNode() throws Exception {
     return registerNode(null, null);
   }
@@ -159,6 +172,12 @@ public class MockNM {
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
+    return nodeHeartbeat(conts, new ArrayList<Container>(), isHealthy, resId);
+  }
+
+  public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
+      List<ContainerStatus>> conts, List<Container> increasedConts,
+          boolean isHealthy, int resId) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setResponseId(resId);
@@ -167,6 +186,7 @@ public class MockNM {
       Log.info("entry.getValue() " + entry.getValue());
       status.setContainersStatuses(entry.getValue());
     }
+    status.setIncreasedContainers(increasedConts);
     NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
     healthStatus.setHealthReport("");
     healthStatus.setIsNodeHealthy(isHealthy);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index d1c9f6e..78aa139 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
+    .AllocationExpirationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
@@ -949,10 +951,14 @@ public class TestRMNodeTransitions {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
     ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
-    mockExpirer.register(containerId1);
-    mockExpirer.register(containerId2);
-    verify(mockExpirer).register(containerId1);
-    verify(mockExpirer).register(containerId2);
+    AllocationExpirationInfo expirationInfo1 =
+        new AllocationExpirationInfo(containerId1);
+    AllocationExpirationInfo expirationInfo2 =
+        new AllocationExpirationInfo(containerId2);
+    mockExpirer.register(expirationInfo1);
+    mockExpirer.register(expirationInfo2);
+    verify(mockExpirer).register(expirationInfo1);
+    verify(mockExpirer).register(expirationInfo2);
     ((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer);
     RMNodeImpl rmNode = getRunningNode();
     ContainerStatus status1 =
@@ -966,7 +972,7 @@ public class TestRMNodeTransitions {
     statusList.add(status2);
     RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList);
     rmNode.handle(statusEvent);
-    verify(mockExpirer).unregister(containerId1);
-    verify(mockExpirer).unregister(containerId2);
+    verify(mockExpirer).unregister(expirationInfo1);
+    verify(mockExpirer).unregister(expirationInfo2);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index fa0e2ed..ed8d56f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -249,120 +250,13 @@ public class TestRMContainerImpl {
     rmContainer.handle(new RMContainerFinishedEvent(containerId,
         containerStatus, RMContainerEventType.EXPIRE));
     drainDispatcher.await();
-    assertEquals(RMContainerState.EXPIRED, rmContainer.getState());
-    verify(writer, times(1)).containerFinished(any(RMContainer.class));
-    verify(publisher, times(1)).containerFinished(any(RMContainer.class),
-        anyLong());
-  }
-  
-  private void testExpireAfterIncreased(boolean acquired) {
-    /*
-     * Similar to previous test, a container is increased but not acquired by
-     * AM. In this case, if a container is expired, the container should be
-     * finished.
-     */
-    DrainDispatcher drainDispatcher = new DrainDispatcher();
-    EventHandler<RMAppAttemptEvent> appAttemptEventHandler =
-        mock(EventHandler.class);
-    EventHandler generic = mock(EventHandler.class);
-    drainDispatcher.register(RMAppAttemptEventType.class,
-        appAttemptEventHandler);
-    drainDispatcher.register(RMNodeEventType.class, generic);
-    drainDispatcher.init(new YarnConfiguration());
-    drainDispatcher.start();
-    NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
-    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
-    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
-        appId, 1);
-    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
-    ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
-
-    Resource resource = BuilderUtils.newResource(512, 1);
-    Priority priority = BuilderUtils.newPriority(5);
-
-    Container container = BuilderUtils.newContainer(containerId, nodeId,
-        "host:3465", resource, priority, null);
-
-    RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
-    SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
-    RMContext rmContext = mock(RMContext.class);
-    when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
-    when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
-    when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
-    when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
-    when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
-    ConcurrentMap<ApplicationId, RMApp> apps =
-        new ConcurrentHashMap<ApplicationId, RMApp>();
-    apps.put(appId, mock(RMApp.class));
-    when(rmContext.getRMApps()).thenReturn(apps);
-    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
-        nodeId, "user", rmContext);
-
-    assertEquals(RMContainerState.NEW, rmContainer.getState());
-    assertEquals(resource, rmContainer.getAllocatedResource());
-    assertEquals(nodeId, rmContainer.getAllocatedNode());
-    assertEquals(priority, rmContainer.getAllocatedPriority());
-    verify(writer).containerStarted(any(RMContainer.class));
-    verify(publisher).containerCreated(any(RMContainer.class), anyLong());
-
-    rmContainer.handle(new RMContainerEvent(containerId,
-        RMContainerEventType.START));
-    drainDispatcher.await();
-    assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
-
-    rmContainer.handle(new RMContainerEvent(containerId,
-        RMContainerEventType.ACQUIRED));
-    drainDispatcher.await();
-    assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
-
-    rmContainer.handle(new RMContainerEvent(containerId,
-        RMContainerEventType.LAUNCHED));
-    drainDispatcher.await();
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
-    assertEquals(
-        "http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
-        rmContainer.getLogURL());
-    
-    // newResource is more than the old resource
-    Resource newResource = BuilderUtils.newResource(1024, 2);
-    rmContainer.handle(new RMContainerChangeResourceEvent(containerId,
-        newResource, true));
-
-    if (acquired) {
-      rmContainer
-          .handle(new RMContainerUpdatesAcquiredEvent(containerId, true));
-      drainDispatcher.await();
-      // status is still RUNNING since this is a increased container acquired by
-      // AM 
-      assertEquals(RMContainerState.RUNNING, rmContainer.getState());
-    }
-
-    // In RUNNING state. Verify EXPIRE and associated actions.
-    reset(appAttemptEventHandler);
-    ContainerStatus containerStatus = SchedulerUtils
-        .createAbnormalContainerStatus(containerId,
-            SchedulerUtils.EXPIRED_CONTAINER);
-    rmContainer.handle(new RMContainerFinishedEvent(containerId,
-        containerStatus, RMContainerEventType.EXPIRE));
-    drainDispatcher.await();
-    assertEquals(RMContainerState.EXPIRED, rmContainer.getState());
-    
-    // Container will be finished only when it is acquired by AM after increase,
-    // we will only notify expirer when it is acquired by AM.
-    verify(writer, times(1)).containerFinished(any(RMContainer.class));
-    verify(publisher, times(1)).containerFinished(any(RMContainer.class),
+    verify(writer, never()).containerFinished(any(RMContainer.class));
+    verify(publisher, never()).containerFinished(any(RMContainer.class),
         anyLong());
   }
 
   @Test
-  public void testExpireAfterContainerResourceIncreased() throws Exception {
-    // expire after increased and acquired by AM
-    testExpireAfterIncreased(true);
-    // expire after increased but not acquired by AM
-    testExpireAfterIncreased(false);
-  }
-  
-  @Test
   public void testExistenceOfResourceRequestInRMContainer() throws Exception {
     Configuration conf = new Configuration();
     MockRM rm1 = new MockRM(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index c08af9d..9e29842 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
@@ -143,7 +142,8 @@ public class TestContainerResizing {
                 .newInstance(containerId1, Resources.createResource(3 * GB))),
         null);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
 
     checkPendingResource(rm1, "default", 2 * GB, null);
     Assert.assertEquals(2 * GB,
@@ -183,7 +183,8 @@ public class TestContainerResizing {
     // app1 -> a1
     RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
 
     checkUsedResource(rm1, "default", 3 * GB, null);
     Assert.assertEquals(3 * GB,
@@ -242,7 +243,8 @@ public class TestContainerResizing {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
 
     // Allocate two more containers
     am1.allocate(
@@ -346,7 +348,8 @@ public class TestContainerResizing {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
 
     // Allocate 1 container
     am1.allocate(
@@ -422,7 +425,8 @@ public class TestContainerResizing {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
 
     // Allocate two more containers
     am1.allocate(
@@ -532,7 +536,8 @@ public class TestContainerResizing {
     RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
 
     // Allocate two more containers
     am1.allocate(
@@ -593,8 +598,8 @@ public class TestContainerResizing {
     am1.allocate(null, Arrays.asList(containerId2));
     // am1 asks to change its AM container from 2G to 1G (decrease)
     am1.sendContainerResizingRequest(null, Arrays.asList(
-            ContainerResourceChangeRequest
-                .newInstance(containerId1, Resources.createResource(1 * GB))));
+        ContainerResourceChangeRequest
+            .newInstance(containerId1, Resources.createResource(1 * GB))));
     // Trigger a node heartbeat..
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
     
@@ -643,7 +648,8 @@ public class TestContainerResizing {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
 
     // Allocate two more containers
     am1.allocate(
@@ -740,7 +746,8 @@ public class TestContainerResizing {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
 
     // Allocate two more containers
     am1.allocate(
@@ -862,7 +869,8 @@ public class TestContainerResizing {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
     ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
 
     // Container 2, 3 (priority=3)
@@ -942,7 +950,8 @@ public class TestContainerResizing {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
 
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
     ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
 
     // Container 2, 3 (priority=3)
@@ -1021,7 +1030,8 @@ public class TestContainerResizing {
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
     // making sure resource is allocated
     checkUsedResource(rm, "default", 3 * GB, null);
-    FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm, app1.getApplicationId());
     Assert.assertEquals(3 * GB,
         app.getAppAttemptResourceUsage().getUsed().getMemory());
     // making sure container is launched
@@ -1113,10 +1123,4 @@ public class TestContainerResizing {
     Assert
         .assertEquals(expectedMemory, node.getAvailableResource().getMemory());
   }
-
-  private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
-      ApplicationId appId) {
-    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-    return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
new file mode 100644
index 0000000..d7ac0b2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestIncreaseAllocationExpirer {
+  private final int GB = 1024;
+  private YarnConfiguration conf;
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+
+  @Test
+  public void testContainerIsRemovedFromAllocationExpirer()
+      throws Exception {
+    /**
+     * 1. Allocate 1 container: containerId2 (1G)
+     * 2. Increase resource of containerId2: 1G -> 3G
+     * 3. AM acquires the token
+     * 4. AM uses the token
+     * 5. Verify containerId2 is removed from allocation expirer such
+     *    that it still runs fine after allocation expiration interval
+     */
+    // Set the allocation expiration to 5 seconds
+    conf.setLong(
+        YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    // Submit an application
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    // Report AM container status RUNNING to remove it from expirer
+    nm1.nodeHeartbeat(
+        app1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.RUNNING);
+    // AM request a new container
+    am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList<ContainerId>());
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+    // AM acquire a new container to start container allocation expirer
+    List<Container> containers =
+        am1.allocate(null, null).getAllocatedContainers();
+    Assert.assertEquals(containerId2, containers.get(0).getId());
+    Assert.assertNotNull(containers.get(0).getContainerToken());
+    checkUsedResource(rm1, "default", 2 * GB, null);
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
+    Assert.assertEquals(2 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB);
+    // Report container status
+    nm1.nodeHeartbeat(
+        app1.getCurrentAppAttempt()
+            .getAppAttemptId(), 2, ContainerState.RUNNING);
+    // Wait until container status is RUNNING, and is removed from
+    // allocation expirer
+    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+    // am1 asks to increase containerId2 from 1GB to 3GB
+    am1.sendContainerResizingRequest(Collections.singletonList(
+        ContainerResourceChangeRequest.newInstance(
+            containerId2, Resources.createResource(3 * GB))), null);
+    // Kick off scheduling and sleep for 1 second;
+    nm1.nodeHeartbeat(true);
+    Thread.sleep(1000);
+    // Start container increase allocation expirer"
+    am1.allocate(null, null);
+    // Remember the resource in order to report status
+    Resource resource = Resources.clone(
+        rm1.getResourceScheduler().getRMContainer(containerId2)
+            .getAllocatedResource());
+    nm1.containerIncreaseStatus(getContainer(rm1, containerId2, resource));
+    // Wait long enough and verify that the container was removed
+    // from allocation expirer, and the container is still running
+    Thread.sleep(10000);
+    Assert.assertEquals(RMContainerState.RUNNING,
+        rm1.getResourceScheduler().getRMContainer(containerId2).getState());
+    // Verify container size is 3G
+    Assert.assertEquals(
+        3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
+            .getAllocatedResource().getMemory());
+    // Verify total resource usage
+    checkUsedResource(rm1, "default", 4 * GB, null);
+    Assert.assertEquals(4 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    // Verify available resource
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
+    rm1.stop();
+  }
+
+  @Test
+  public void testContainerIncreaseAllocationExpiration()
+      throws Exception {
+    /**
+     * 1. Allocate 1 container: containerId2 (1G)
+     * 2. Increase resource of containerId2: 1G -> 3G
+     * 3. AM acquires the token
+     * 4. AM does not use the token
+     * 5. Verify containerId2's resource usage falls back to
+     *    1G after the increase token expires
+     */
+    // Set the allocation expiration to 5 seconds
+    conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    nm1.nodeHeartbeat(
+        app1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.RUNNING);
+    am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList<ContainerId>());
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+    List<Container> containers =
+        am1.allocate(null, null).getAllocatedContainers();
+    Assert.assertEquals(containerId2, containers.get(0).getId());
+    Assert.assertNotNull(containers.get(0).getContainerToken());
+    checkUsedResource(rm1, "default", 2 * GB, null);
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
+    Assert.assertEquals(2 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB);
+    nm1.nodeHeartbeat(
+        app1.getCurrentAppAttempt()
+            .getAppAttemptId(), 2, ContainerState.RUNNING);
+    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+    // am1 asks to increase containerId2 from 1GB to 3GB
+    am1.sendContainerResizingRequest(Collections.singletonList(
+        ContainerResourceChangeRequest.newInstance(
+            containerId2, Resources.createResource(3 * GB))), null);
+    // Kick off scheduling and wait for 1 second;
+    nm1.nodeHeartbeat(true);
+    Thread.sleep(1000);
+    // Start container increase allocation expirer
+    am1.allocate(null, null);
+    // Verify resource usage
+    checkUsedResource(rm1, "default", 4 * GB, null);
+    Assert.assertEquals(4 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
+    // Wait long enough for the increase token to expire, and for the roll
+    // back action to complete
+    Thread.sleep(10000);
+    // Verify container size is 1G
+    Assert.assertEquals(
+        1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
+            .getAllocatedResource().getMemory());
+    // Verify total resource usage is 2G
+    checkUsedResource(rm1, "default", 2 * GB, null);
+    Assert.assertEquals(2 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    // Verify available resource is rolled back to 18GB
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB);
+    rm1.stop();
+  }
+
+  @Test
+  public void testConsecutiveContainerIncreaseAllocationExpiration()
+      throws Exception {
+    /**
+     * 1. Allocate 1 container: containerId2 (1G)
+     * 2. Increase resource of containerId2: 1G -> 3G
+     * 3. AM acquires the token
+     * 4. Increase resource of containerId2 again: 3G -> 5G
+     * 5. AM acquires the token
+     * 6. AM uses the first token to increase the container in NM to 3G
+     * 7. AM NEVER uses the second token
+     * 8. Verify containerId2 eventually is allocated 3G after token expires
+     * 9. Verify NM eventually uses 3G for containerId2
+     */
+    // Set the allocation expiration to 5 seconds
+    conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    // Submit an application
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    nm1.nodeHeartbeat(
+        app1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.RUNNING);
+    // AM request a new container
+    am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList<ContainerId>());
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+    // AM acquire a new container to start container allocation expirer
+    am1.allocate(null, null).getAllocatedContainers();
+    // Report container status
+    nm1.nodeHeartbeat(
+        app1.getCurrentAppAttempt()
+            .getAppAttemptId(), 2, ContainerState.RUNNING);
+    // Wait until container status is RUNNING, and is removed from
+    // allocation expirer
+    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+    // am1 asks to change containerId2 from 1GB to 3GB
+    am1.sendContainerResizingRequest(Collections.singletonList(
+        ContainerResourceChangeRequest.newInstance(
+            containerId2, Resources.createResource(3 * GB))), null);
+    // Kick off scheduling and sleep for 1 second to
+    // make sure the allocation is done
+    nm1.nodeHeartbeat(true);
+    Thread.sleep(1000);
+    // Start container increase allocation expirer
+    am1.allocate(null, null);
+    // Remember the resource (3G) in order to report status
+    Resource resource1 = Resources.clone(
+        rm1.getResourceScheduler().getRMContainer(containerId2)
+            .getAllocatedResource());
+    // am1 asks to change containerId2 from 3GB to 5GB
+    am1.sendContainerResizingRequest(Collections.singletonList(
+        ContainerResourceChangeRequest.newInstance(
+            containerId2, Resources.createResource(5 * GB))), null);
+    // Kick off scheduling and sleep for 1 second to
+    // make sure the allocation is done
+    nm1.nodeHeartbeat(true);
+    Thread.sleep(1000);
+    // Reset container increase allocation expirer
+    am1.allocate(null, null);
+    // Verify current resource allocation in RM
+    checkUsedResource(rm1, "default", 6 * GB, null);
+    FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
+        rm1, app1.getApplicationId());
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    // Verify available resource is now reduced to 14GB
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 14 * GB);
+    // Use the first token (3G)
+    nm1.containerIncreaseStatus(getContainer(rm1, containerId2, resource1));
+    // Wait long enough for the second token (5G) to expire, and verify that
+    // the roll back action is completed as expected
+    Thread.sleep(10000);
+    // Verify container size is rolled back to 3G
+    Assert.assertEquals(
+        3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
+            .getAllocatedResource().getMemory());
+    // Verify total resource usage is 4G
+    checkUsedResource(rm1, "default", 4 * GB, null);
+    Assert.assertEquals(4 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    // Verify available resource is rolled back to 14GB
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
+    // Verify NM receives the decrease message (3G)
+    List<Container> containersToDecrease =
+        nm1.nodeHeartbeat(true).getContainersToDecrease();
+    Assert.assertEquals(1, containersToDecrease.size());
+    Assert.assertEquals(
+        3 * GB, containersToDecrease.get(0).getResource().getMemory());
+    rm1.stop();
+  }
+
+  @Test
+  public void testDecreaseAfterIncreaseWithAllocationExpiration()
+      throws Exception {
+    /**
+     * 1. Allocate three containers: containerId2, containerId3, containerId4
+     * 2. Increase resource of containerId2: 3G -> 6G
+     * 3. Increase resource of containerId3: 3G -> 6G
+     * 4. Increase resource of containerId4: 3G -> 6G
+     * 5. Do NOT use the increase tokens for containerId2 and containerId3
+     * 6. Decrease containerId2: 6G -> 2G (i.e., below last confirmed resource)
+     * 7. Decrease containerId3: 6G -> 4G (i.e., above last confirmed resource)
+     * 8. Decrease containerId4: 6G -> 4G (i.e., above last confirmed resource)
+     * 9. Use token for containerId4 to increase containerId4 on NM to 6G
+     * 10. Verify containerId2 eventually uses 2G (removed from expirer)
+     * 11. verify containerId3 eventually uses 3G (increase token expires)
+     * 12. Verify containerId4 eventually uses 4G (removed from expirer)
+     * 13. Verify NM evetually uses 3G for containerId3, 4G for containerId4
+     */
+    // Set the allocation expiration to 5 seconds
+    conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    // Submit an application
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    nm1.nodeHeartbeat(
+        app1.getCurrentAppAttempt()
+            .getAppAttemptId(), 1, ContainerState.RUNNING);
+    // AM request two new continers
+    am1.allocate("127.0.0.1", 3 * GB, 3, new ArrayList<ContainerId>());
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+    ContainerId containerId3 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    rm1.waitForState(nm1, containerId3, RMContainerState.ALLOCATED);
+    ContainerId containerId4 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
+    rm1.waitForState(nm1, containerId4, RMContainerState.ALLOCATED);
+    // AM acquires tokens to start container allocation expirer
+    List<Container> containers =
+        am1.allocate(null, null).getAllocatedContainers();
+    Assert.assertEquals(3, containers.size());
+    Assert.assertNotNull(containers.get(0).getContainerToken());
+    Assert.assertNotNull(containers.get(1).getContainerToken());
+    Assert.assertNotNull(containers.get(2).getContainerToken());
+    // Report container status
+    nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
+        2, ContainerState.RUNNING);
+    nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
+        3, ContainerState.RUNNING);
+    nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
+        4, ContainerState.RUNNING);
+    // Wait until container status becomes RUNNING
+    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+    rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
+    rm1.waitForState(nm1, containerId4, RMContainerState.RUNNING);
+    // am1 asks to change containerId2 and containerId3 from 1GB to 3GB
+    List<ContainerResourceChangeRequest> increaseRequests = new ArrayList<>();
+    increaseRequests.add(ContainerResourceChangeRequest.newInstance(
+        containerId2, Resources.createResource(6 * GB)));
+    increaseRequests.add(ContainerResourceChangeRequest.newInstance(
+        containerId3, Resources.createResource(6 * GB)));
+    increaseRequests.add(ContainerResourceChangeRequest.newInstance(
+        containerId4, Resources.createResource(6 * GB)));
+    am1.sendContainerResizingRequest(increaseRequests, null);
+    nm1.nodeHeartbeat(true);
+    Thread.sleep(1000);
+    // Start container increase allocation expirer
+    am1.allocate(null, null);
+    // Decrease containers
+    List<ContainerResourceChangeRequest> decreaseRequests = new ArrayList<>();
+    decreaseRequests.add(ContainerResourceChangeRequest.newInstance(
+        containerId2, Resources.createResource(2 * GB)));
+    decreaseRequests.add(ContainerResourceChangeRequest.newInstance(
+        containerId3, Resources.createResource(4 * GB)));
+    decreaseRequests.add(ContainerResourceChangeRequest.newInstance(
+        containerId4, Resources.createResource(4 * GB)));
+    AllocateResponse response =
+        am1.sendContainerResizingRequest(null, decreaseRequests);
+    // Verify containers are decreased in scheduler
+    Assert.assertEquals(3, response.getDecreasedContainers().size());
+    // Use the token for containerId4 on NM (6G). This should set the last
+    // confirmed resource to 4G, and cancel the allocation expirer
+    nm1.containerIncreaseStatus(getContainer(
+        rm1, containerId4, Resources.createResource(6 * GB)));
+    // Wait for containerId3 token to expire,
+    Thread.sleep(10000);
+    Assert.assertEquals(
+        2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
+            .getAllocatedResource().getMemory());
+    Assert.assertEquals(
+        3 * GB, rm1.getResourceScheduler().getRMContainer(containerId3)
+            .getAllocatedResource().getMemory());
+    Assert.assertEquals(
+        4 * GB, rm1.getResourceScheduler().getRMContainer(containerId4)
+            .getAllocatedResource().getMemory());
+    // Verify NM receives 2 decrease message
+    List<Container> containersToDecrease =
+        nm1.nodeHeartbeat(true).getContainersToDecrease();
+    Assert.assertEquals(2, containersToDecrease.size());
+    // Sort the list to make sure containerId3 is the first
+    Collections.sort(containersToDecrease);
+    Assert.assertEquals(
+        3 * GB, containersToDecrease.get(0).getResource().getMemory());
+    Assert.assertEquals(
+        4 * GB, containersToDecrease.get(1).getResource().getMemory());
+    rm1.stop();
+  }
+
+  private void checkUsedResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertEquals(memory,
+        queue.getQueueResourceUsage()
+            .getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+            .getMemory());
+  }
+
+  private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId,
+      int expectedMemory) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    SchedulerNode node = cs.getNode(nodeId);
+    Assert
+        .assertEquals(expectedMemory, node.getAvailableResource().getMemory());
+  }
+
+  private Container getContainer(
+      MockRM rm, ContainerId containerId, Resource resource) {
+    RMContainer rmContainer = rm.getResourceScheduler()
+        .getRMContainer(containerId);
+    return Container.newInstance(
+        containerId, rmContainer.getAllocatedNode(), null,
+            resource, null, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16b17b4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 489ef77..1786069 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@@ -91,10 +92,10 @@ public class TestUtils {
       }
     };
     
-    // No op 
-    ContainerAllocationExpirer cae = 
+    // No op
+    ContainerAllocationExpirer cae =
         new ContainerAllocationExpirer(nullDispatcher);
-    
+
     Configuration conf = new Configuration();
     RMApplicationHistoryWriter writer =  mock(RMApplicationHistoryWriter.class);
     RMContextImpl rmContext =
@@ -122,7 +123,7 @@ public class TestUtils {
             return (Resource) args[1];
           }
         });
-    
+
     rmContext.setNodeLabelManager(nlm);
     rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
     rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
@@ -349,4 +350,10 @@ public class TestUtils {
     conf.setDefaultNodeLabelExpression(B, "y");
     return conf;
   }
+
+  public static FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
+      ApplicationId appId) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
+  }
 }