You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/08/24 19:36:34 UTC

[41/50] [abbrv] hadoop git commit: YARN-6251. Do async container release to prevent deadlock during container updates. (Arun Suresh via wangda)

YARN-6251. Do async container release to prevent deadlock during container updates. (Arun Suresh via wangda)

Change-Id: I6c67d20c5dd4d22752830ebf0ed2340824976ecb


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f49843a9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f49843a9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f49843a9

Branch: refs/heads/YARN-5972
Commit: f49843a9888ad8fe5c1bb4c16bfb5217d693009d
Parents: 4249172
Author: Wangda Tan <wa...@apache.org>
Authored: Wed Aug 23 09:56:20 2017 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Wed Aug 23 09:56:20 2017 -0700

----------------------------------------------------------------------
 ...pportunisticContainerAllocatorAMService.java |  2 +
 .../scheduler/AbstractYarnScheduler.java        | 12 ++++-
 .../scheduler/SchedulerApplicationAttempt.java  | 12 +++--
 .../scheduler/capacity/CapacityScheduler.java   | 12 +++++
 .../distributed/NodeQueueLoadMonitor.java       |  4 ++
 .../scheduler/event/ReleaseContainerEvent.java  | 46 ++++++++++++++++++++
 .../scheduler/event/SchedulerEventType.java     |  3 ++
 .../scheduler/fair/FairScheduler.java           | 13 ++++++
 .../scheduler/fifo/FifoScheduler.java           | 15 ++++++-
 .../yarn/server/resourcemanager/MockNodes.java  |  2 +-
 ...pportunisticContainerAllocatorAMService.java | 36 +++++++++------
 .../capacity/TestContainerResizing.java         | 31 ++++++++++++-
 .../capacity/TestIncreaseAllocationExpirer.java | 27 +++++++++---
 13 files changed, 188 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 3c278de..4fc2916 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -391,6 +391,8 @@ public class OpportunisticContainerAllocatorAMService
       break;
     case NODE_LABELS_UPDATE:
       break;
+    case RELEASE_CONTAINER:
+      break;
     // <-- IGNORED EVENTS : END -->
     default:
       LOG.error("Unknown event arrived at" +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index c3879dd..2c27017 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -89,6 +88,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntit
 
 
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -1273,4 +1273,14 @@ public abstract class AbstractYarnScheduler
   public List<NodeId> getNodeIds(String resourceName) {
     return nodeTracker.getNodeIdsByResourceName(resourceName);
   }
+
+  /**
+   * To be used to release a container via a Scheduler Event rather than
+   * in the same thread.
+   * @param container Container.
+   */
+  public void asyncContainerRelease(RMContainer container) {
+    this.rmContext.getDispatcher().getEventHandler()
+        .handle(new ReleaseContainerEvent(container));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index cc14a1e..f9a7219 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer
 
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 
@@ -866,10 +867,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
         // Mark container for release (set RRs to null, so RM does not think
         // it is a recoverable container)
         ((RMContainerImpl) c).setResourceRequests(null);
-        ((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c,
-            SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(),
-                SchedulerUtils.UPDATED_CONTAINER),
-            RMContainerEventType.KILL);
+
+        // Release this container async-ly so as to prevent
+        // 'LeafQueue::completedContainer()' from trying to acquire a lock
+        // on the app and queue which can contended for in the reverse order
+        // by the Scheduler thread.
+        ((AbstractYarnScheduler)rmContext.getScheduler())
+            .asyncContainerRelease(c);
         tempIter.remove();
       }
       return updatedContainers;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e4ca003..fde84c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -124,6 +124,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsU
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
@@ -1491,6 +1493,16 @@ public class CapacityScheduler extends
       }
     }
     break;
+    case RELEASE_CONTAINER:
+    {
+      RMContainer container = ((ReleaseContainerEvent) event).getContainer();
+      completedContainer(container,
+          SchedulerUtils.createAbnormalContainerStatus(
+            container.getContainerId(),
+            SchedulerUtils.RELEASED_CONTAINER),
+          RMContainerEventType.RELEASED);
+    }
+    break;
     case KILL_RESERVED_CONTAINER:
     {
       ContainerPreemptEvent killReservedContainerEvent =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index fb67270..ed0ee1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -203,6 +203,10 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     LOG.debug("Node update event from: " + rmNode.getNodeID());
     OpportunisticContainersStatus opportunisticContainersStatus =
         rmNode.getOpportunisticContainersStatus();
+    if (opportunisticContainersStatus == null) {
+      opportunisticContainersStatus =
+          OpportunisticContainersStatus.newInstance();
+    }
     int estimatedQueueWaitTime =
         opportunisticContainersStatus.getEstimatedQueueWaitTime();
     int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java
new file mode 100644
index 0000000..4f31684
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * Event used to release a container.
+ */
+public class ReleaseContainerEvent extends SchedulerEvent {
+
+  private final RMContainer container;
+
+  /**
+   * Create Event.
+   * @param rmContainer RMContainer.
+   */
+  public ReleaseContainerEvent(RMContainer rmContainer) {
+    super(SchedulerEventType.RELEASE_CONTAINER);
+    this.container = rmContainer;
+  }
+
+  /**
+   * Get RMContainer.
+   * @return RMContainer.
+   */
+  public RMContainer getContainer() {
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 35b7c14..229e0bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -38,6 +38,9 @@ public enum SchedulerEventType {
   // Source: ContainerAllocationExpirer
   CONTAINER_EXPIRED,
 
+  // Source: SchedulerAppAttempt::pullNewlyUpdatedContainer.
+  RELEASE_CONTAINER,
+
   /* Source: SchedulingEditPolicy */
   KILL_RESERVED_CONTAINER,
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 0f417c3..c521250 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -83,6 +83,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -1195,6 +1197,17 @@ public class FairScheduler extends
           appAttemptRemovedEvent.getFinalAttemptState(),
           appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
       break;
+    case RELEASE_CONTAINER:
+      if (!(event instanceof ReleaseContainerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      RMContainer container = ((ReleaseContainerEvent) event).getContainer();
+      completedContainer(container,
+          SchedulerUtils.createAbnormalContainerStatus(
+              container.getContainerId(),
+              SchedulerUtils.RELEASED_CONTAINER),
+          RMContainerEventType.RELEASED);
+      break;
     case CONTAINER_EXPIRED:
       if (!(event instanceof ContainerExpiredSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 92a88b9..94c7e16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -80,6 +79,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -820,6 +821,18 @@ public class FifoScheduler extends
           RMContainerEventType.EXPIRE);
     }
     break;
+    case RELEASE_CONTAINER: {
+      if (!(event instanceof ReleaseContainerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      RMContainer container = ((ReleaseContainerEvent) event).getContainer();
+      completedContainer(container,
+          SchedulerUtils.createAbnormalContainerStatus(
+              container.getContainerId(),
+              SchedulerUtils.RELEASED_CONTAINER),
+          RMContainerEventType.RELEASED);
+    }
+    break;
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 7f58711..611c7f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -263,7 +263,7 @@ public class MockNodes {
     }
 
     public OpportunisticContainersStatus getOpportunisticContainersStatus() {
-      return null;
+      return OpportunisticContainersStatus.newInstance();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index b885118..9b9eb3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -108,6 +110,7 @@ public class TestOpportunisticContainerAllocatorAMService {
   private static final int GB = 1024;
 
   private MockRM rm;
+  private DrainDispatcher dispatcher;
 
   @Before
   public void createAndStartRM() {
@@ -120,8 +123,7 @@ public class TestOpportunisticContainerAllocatorAMService {
         YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
     conf.setInt(
         YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
-    rm = new MockRM(conf);
-    rm.start();
+    startRM(conf);
   }
 
   public void createAndStartRMWithAutoUpdateContainer() {
@@ -135,7 +137,17 @@ public class TestOpportunisticContainerAllocatorAMService {
         YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
     conf.setInt(
         YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
-    rm = new MockRM(conf);
+    startRM(conf);
+  }
+
+  private void startRM(final YarnConfiguration conf) {
+    dispatcher = new DrainDispatcher();
+    rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
     rm.start();
   }
 
@@ -180,17 +192,6 @@ public class TestOpportunisticContainerAllocatorAMService {
     nm3.nodeHeartbeat(true);
     nm4.nodeHeartbeat(true);
 
-    ((RMNodeImpl) rmNode1)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-    ((RMNodeImpl) rmNode2)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-    ((RMNodeImpl) rmNode3)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-    ((RMNodeImpl) rmNode4)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
-    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
-        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
     // Send add and update node events to AM Service.
     amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
     amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
@@ -246,6 +247,9 @@ public class TestOpportunisticContainerAllocatorAMService {
     allocateResponse =  am1.allocate(new ArrayList<>(), new ArrayList<>());
     Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
 
+    // Wait for scheduler to process all events
+    dispatcher.waitForEventThreadToWait();
+    Thread.sleep(1000);
     // Verify Metrics After OPP allocation (Nothing should change again)
     verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
 
@@ -319,6 +323,8 @@ public class TestOpportunisticContainerAllocatorAMService {
     Assert.assertEquals(uc.getId(), container.getId());
     Assert.assertEquals(uc.getVersion(), container.getVersion() + 2);
 
+    // Wait for scheduler to finish processing events
+    dispatcher.waitForEventThreadToWait();
     // Verify Metrics After OPP allocation :
     // Everything should have reverted to what it was
     verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
@@ -663,6 +669,7 @@ public class TestOpportunisticContainerAllocatorAMService {
     Assert.assertEquals(container.getId(), uc.getContainer().getId());
     Assert.assertEquals(Resource.newInstance(2 * GB, 1),
         uc.getContainer().getResource());
+    Thread.sleep(1000);
 
     // Check that the container resources are increased in
     // NM through NM heartbeat response
@@ -679,6 +686,7 @@ public class TestOpportunisticContainerAllocatorAMService {
             ContainerUpdateType.DECREASE_RESOURCE,
             Resources.createResource(1 * GB, 1), null)));
     Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+    Thread.sleep(1000);
 
     // Check that the container resources are decreased
     // in NM through NM heartbeat response

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 291a74e..541539d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -163,11 +165,17 @@ public class TestContainerResizing {
      * Application has a container running, try to decrease the container and
      * check queue's usage and container resource will be updated.
      */
+    final DrainDispatcher dispatcher = new DrainDispatcher();
     MockRM rm1 = new MockRM() {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;
       }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
     };
     rm1.start();
     MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
@@ -194,6 +202,10 @@ public class TestContainerResizing {
                 Resources.createResource(1 * GB), null)));
 
     verifyContainerDecreased(response, containerId1, 1 * GB);
+
+    // Wait for scheduler to finish processing kill events..
+    dispatcher.waitForEventThreadToWait();
+
     checkUsedResource(rm1, "default", 1 * GB, null);
     Assert.assertEquals(1 * GB,
         app.getAppAttemptResourceUsage().getUsed().getMemorySize());
@@ -507,11 +519,17 @@ public class TestContainerResizing {
      * the increase request reserved, it decreases the reserved container,
      * container should be decreased and reservation will be cancelled
      */
+    final DrainDispatcher dispatcher = new DrainDispatcher();
     MockRM rm1 = new MockRM() {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;
       }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
     };
     rm1.start();
     MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
@@ -586,7 +604,8 @@ public class TestContainerResizing {
                 Resources.createResource(1 * GB), null)));
     // Trigger a node heartbeat..
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    
+
+    dispatcher.waitForEventThreadToWait();
     /* Check statuses after reservation satisfied */
     // Increase request should be unreserved
     Assert.assertTrue(app.getReservedContainers().isEmpty());
@@ -617,11 +636,17 @@ public class TestContainerResizing {
      * So increase container request will be reserved. When app releases
      * container2, reserved part should be released as well.
      */
+    final DrainDispatcher dispatcher = new DrainDispatcher();
     MockRM rm1 = new MockRM() {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;
       }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
     };
     rm1.start();
     MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
@@ -687,6 +712,10 @@ public class TestContainerResizing {
 
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
     am1.allocate(null, null);
+
+    // Wait for scheduler to process all events.
+    dispatcher.waitForEventThreadToWait();
+
     /* Check statuses after reservation satisfied */
     // Increase request should be unreserved
     Assert.assertTrue(app.getReservedContainers().isEmpty());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f49843a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index a76ed64..d2e28be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -155,7 +157,13 @@ public class TestIncreaseAllocationExpirer {
      */
     // Set the allocation expiration to 5 seconds
     conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
-    MockRM rm1 = new MockRM(conf);
+    final DrainDispatcher disp = new DrainDispatcher();
+    MockRM rm1 = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return disp;
+      }
+    };
     rm1.start();
     MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
@@ -204,6 +212,7 @@ public class TestIncreaseAllocationExpirer {
     Assert.assertEquals(
         1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
             .getAllocatedResource().getMemorySize());
+    disp.waitForEventThreadToWait();
     // Verify total resource usage is 2G
     checkUsedResource(rm1, "default", 2 * GB, null);
     Assert.assertEquals(2 * GB,
@@ -420,7 +429,7 @@ public class TestIncreaseAllocationExpirer {
     nm1.containerIncreaseStatus(getContainer(
         rm1, containerId4, Resources.createResource(6 * GB)));
     // Wait for containerId3 token to expire,
-    Thread.sleep(10000);
+    Thread.sleep(12000);
 
     am1.allocate(null, null);
 
@@ -436,13 +445,21 @@ public class TestIncreaseAllocationExpirer {
     // Verify NM receives 2 decrease message
     List<Container> containersToDecrease =
         nm1.nodeHeartbeat(true).getContainersToUpdate();
-    Assert.assertEquals(2, containersToDecrease.size());
+    // NOTE: Can be more that 2 depending on which event arrives first.
+    // What is important is the final size of the containers.
+    Assert.assertTrue(containersToDecrease.size() >= 2);
+
     // Sort the list to make sure containerId3 is the first
     Collections.sort(containersToDecrease);
+    int i = 0;
+    if (containersToDecrease.size() > 2) {
+      Assert.assertEquals(
+          2 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
+    }
     Assert.assertEquals(
-        3 * GB, containersToDecrease.get(0).getResource().getMemorySize());
+        3 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
     Assert.assertEquals(
-        4 * GB, containersToDecrease.get(1).getResource().getMemorySize());
+        4 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
     rm1.stop();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org