You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/04/04 19:56:59 UTC

[35/50] [abbrv] hadoop git commit: YARN-6411. Clean up the overwrite of createDispatcher() in subclass of MockRM. Contributed by Yufei Gu

YARN-6411. Clean up the overwrite of createDispatcher() in subclass of MockRM. Contributed by Yufei Gu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d1fac5d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d1fac5d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d1fac5d

Branch: refs/heads/HDFS-7240
Commit: 4d1fac5df2508011adfc4c5d683beb00fd5ced45
Parents: 28cdc5a
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Fri Mar 31 10:05:34 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Fri Mar 31 10:05:34 2017 -0500

----------------------------------------------------------------------
 .../v2/app/rm/TestRMContainerAllocator.java     | 453 +++++++++----------
 .../api/impl/TestAMRMClientOnRMRestart.java     |  59 +--
 .../server/resourcemanager/ACLsTestBase.java    |  10 -
 .../server/resourcemanager/RMHATestBase.java    |  20 +-
 .../ReservationACLsTestBase.java                |   5 +-
 .../resourcemanager/TestApplicationCleanup.java |  44 +-
 .../TestApplicationMasterLauncher.java          |  11 +-
 .../TestApplicationMasterService.java           |  19 +-
 .../TestNodeBlacklistingOnAMFailures.java       |  41 +-
 .../TestReservationSystemWithRMHA.java          |   5 +-
 .../TestAMRMRPCNodeUpdates.java                 |  18 +-
 .../resourcetracker/TestNMReconnect.java        |  14 +-
 .../rmcontainer/TestRMContainerImpl.java        |   1 -
 .../capacity/TestApplicationPriority.java       |  29 +-
 .../security/TestClientToAMTokens.java          |  23 +-
 15 files changed, 277 insertions(+), 475 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index e6aee6e..933bd01 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -179,21 +179,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -207,7 +205,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
     MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -222,7 +220,7 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
 
@@ -234,7 +232,7 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
     
@@ -242,18 +240,18 @@ public class TestRMContainerAllocator {
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
     checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
         assigned, false);
     
     // check that the assigned container requests are cancelled
-    assigned = allocator.schedule();
-    dispatcher.await();
-    Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());    
+    allocator.schedule();
+    rm.drainEvents();
+    Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());
   }
   
   @Test 
@@ -269,21 +267,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -297,7 +293,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps 
     rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
     MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container requests for maps
     ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -313,7 +309,7 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // update resources in scheduler
@@ -323,10 +319,10 @@ public class TestRMContainerAllocator {
     // Node heartbeat from node-local next. This allocates 2 node local 
     // containers for task1 and task2. These should be matched with those tasks.
     nodeManager1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
         assigned, false);
     // remove the rack-local assignment that should have happened for task3
@@ -350,21 +346,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -378,7 +372,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
     MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -393,17 +387,17 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     checkAssignments(new ContainerRequestEvent[] { event1, event2 },
         assigned, false);
   }
@@ -416,19 +410,17 @@ public class TestRMContainerAllocator {
     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
     final MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
     final RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     final String host = "host1";
     final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048);
     nm.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
     final JobId jobId = MRBuilderUtils
         .newJobId(appAttemptId.getApplicationId(), 0);
     final Job mockJob = mock(Job.class);
@@ -438,20 +430,20 @@ public class TestRMContainerAllocator {
     final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob, SystemClock.getInstance());
     // add resources to scheduler
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     final String[] locations = new String[] { host };
     allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
     for (int i = 0; i < 1;) {
-      dispatcher.await();
+      rm.drainEvents();
       i += allocator.schedule().size();
       nm.nodeHeartbeat(true);
     }
 
     allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
     while (allocator.getTaskAttemptKillEvents().size() == 0) {
-      dispatcher.await();
+      rm.drainEvents();
       allocator.schedule().size();
       nm.nodeHeartbeat(true);
     }
@@ -468,21 +460,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -521,21 +511,19 @@ public class TestRMContainerAllocator {
 
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -584,21 +572,19 @@ public class TestRMContainerAllocator {
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8));
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -639,18 +625,16 @@ public class TestRMContainerAllocator {
     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
     final MyResourceManager2 rm = new MyResourceManager2(conf);
     rm.start();
-    final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext()
-            .getDispatcher();
     final RMApp app = rm.submitApp(2048);
-    dispatcher.await();
+    rm.drainEvents();
     final String host = "host1";
     final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096);
     nm.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
           .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
     final JobId jobId = MRBuilderUtils
                  .newJobId(appAttemptId.getApplicationId(), 0);
     final Job mockJob = mock(Job.class);
@@ -666,14 +650,14 @@ public class TestRMContainerAllocator {
     allocator.scheduleAllReduces();
     allocator.makeRemoteRequest();
     nm.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
 
     int assignedContainer;
     for (assignedContainer = 0; assignedContainer < 1;) {
       assignedContainer += allocator.schedule().size();
       nm.nodeHeartbeat(true);
-      dispatcher.await();
+      rm.drainEvents();
     }
     // only 1 allocated container should be assigned
     Assert.assertEquals(assignedContainer, 1);
@@ -773,21 +757,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -801,7 +783,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
     MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     // send MAP request
@@ -822,17 +804,17 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     checkAssignments(new ContainerRequestEvent[] { event1, event3 },
         assigned, false);
 
@@ -864,11 +846,6 @@ public class TestRMContainerAllocator {
     }
 
     @Override
-    protected Dispatcher createDispatcher() {
-      return new DrainDispatcher();
-    }
-
-    @Override
     protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
       // Dispatch inline for test sanity
       return new EventHandler<SchedulerEvent>() {
@@ -912,16 +889,16 @@ public class TestRMContainerAllocator {
 
     // Submit the application
     RMApp rmApp = rm.submitApp(1024);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 21504);
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
       appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
@@ -959,11 +936,11 @@ public class TestRMContainerAllocator {
     amDispatcher.await();
 
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
 
     // Wait for all map-tasks to be running
     for (Task t : job.getTasks().values()) {
@@ -973,7 +950,7 @@ public class TestRMContainerAllocator {
     }
 
     allocator.schedule(); // Send heartbeat
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
 
@@ -981,14 +958,14 @@ public class TestRMContainerAllocator {
     Iterator<Task> it = job.getTasks().values().iterator();
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
 
     // Finish off 7 more so that map-progress is 80%
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
 
@@ -996,11 +973,11 @@ public class TestRMContainerAllocator {
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
 
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
 
     // Wait for all reduce-tasks to be running
     for (Task t : job.getTasks().values()) {
@@ -1013,14 +990,14 @@ public class TestRMContainerAllocator {
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
 
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
 
     // Finish off the remaining 8 reduces.
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     // Remaining is JobCleanup
     Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
@@ -1064,16 +1041,16 @@ public class TestRMContainerAllocator {
 
     // Submit the application
     RMApp rmApp = rm.submitApp(1024);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
       appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
@@ -1109,11 +1086,11 @@ public class TestRMContainerAllocator {
     amDispatcher.await();
 
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
 
     // Wait for all map-tasks to be running
     for (Task t : job.getTasks().values()) {
@@ -1121,7 +1098,7 @@ public class TestRMContainerAllocator {
     }
 
     allocator.schedule(); // Send heartbeat
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
 
@@ -1130,21 +1107,21 @@ public class TestRMContainerAllocator {
     // Finish off 1 map so that map-progress is 10%
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
 
     // Finish off 5 more map so that map-progress is 60%
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
 
     // Finish off remaining map so that map-progress is 100%
     finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4);
     allocator.schedule();
-    rmDispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
     Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
   }
@@ -1154,21 +1131,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
-    
+    rm.drainEvents();
+
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
@@ -1177,7 +1152,7 @@ public class TestRMContainerAllocator {
     // add resources to scheduler
     MockNM nm1 = rm.registerNode("h1:1234", 10240);
     MockNM nm2 = rm.registerNode("h2:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the map container request
     ContainerRequestEvent event = createReq(jobId, 1, 1024,
@@ -1193,16 +1168,16 @@ public class TestRMContainerAllocator {
 
     // this tells the scheduler about the requests
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
     Assert.assertEquals(3, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
     allocator.getJobUpdatedNodeEvents().clear();
     // get the assignment
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(1, assigned.size());
     Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId());
     // no updated nodes reported
@@ -1212,11 +1187,11 @@ public class TestRMContainerAllocator {
     // mark nodes bad
     nm1.nodeHeartbeat(false);
     nm2.nodeHeartbeat(false);
-    dispatcher.await();
-    
+    rm.drainEvents();
+
     // schedule response returns updated nodes
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0, assigned.size());
     // updated nodes are reported
     Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
@@ -1227,7 +1202,7 @@ public class TestRMContainerAllocator {
     allocator.getTaskAttemptKillEvents().clear();
     
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(0, assigned.size());
     // no updated nodes reported
     Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
@@ -1247,21 +1222,19 @@ public class TestRMContainerAllocator {
     
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
     
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -1275,7 +1248,7 @@ public class TestRMContainerAllocator {
     MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
     MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     // create the container request
     ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -1295,7 +1268,7 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Send events to blacklist nodes h1 and h2
@@ -1307,28 +1280,28 @@ public class TestRMContainerAllocator {
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     assertBlacklistAdditionsAndRemovals(2, 0, rm);
 
     // mark h1/h2 as bad nodes
     nodeManager1.nodeHeartbeat(false);
     nodeManager2.nodeHeartbeat(false);
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
 
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
         
     Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
@@ -1352,24 +1325,22 @@ public class TestRMContainerAllocator {
 
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM[] nodeManagers = new MockNM[10];
     int nmNum = 0;
     List<TaskAttemptContainerAssignedEvent> assigned = null;
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     nodeManagers[0].nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -1382,7 +1353,7 @@ public class TestRMContainerAllocator {
     // Known=1, blacklisted=0, ignore should be false - assign first container
     assigned =
         getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
@@ -1397,47 +1368,47 @@ public class TestRMContainerAllocator {
     // The current call will send blacklisted node "h1" to RM
     assigned =
         getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 1, 0, 0, 1, rm);
+            nodeManagers[0], allocator, 1, 0, 0, 1, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Known=1, blacklisted=1, ignore should be true - assign 1
     assigned =
         getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
         getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
-            nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[1], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
     assigned =
         getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Known=3, blacklisted=1, ignore should be true - assign 1
     assigned =
         getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=4, blacklisted=1, ignore should be false - assign 1 anyway
     assigned =
         getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
-            nodeManagers[3], dispatcher, allocator, 0, 0, 1, 0, rm);
+            nodeManagers[3], allocator, 0, 0, 1, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Test blacklisting re-enabled.
     // Known=4, blacklisted=1, ignore should be false - no assignment on h1
     assigned =
         getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     // RMContainerRequestor would have created a replacement request.
 
@@ -1450,61 +1421,61 @@ public class TestRMContainerAllocator {
     // container for the same reason above.
     assigned =
         getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 1, 0, 0, 2, rm);
+            nodeManagers[0], allocator, 1, 0, 0, 2, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true. Should assign 2
     // containers.
     assigned =
         getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
-            nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[0], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
 
     // Known=4, blacklisted=2, ignore should be true.
     assigned =
         getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
-            nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[1], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
 
     // Test blacklist while ignore blacklisting enabled
     ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
     allocator.sendFailure(f3);
 
-    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
     // Known=5, blacklisted=3, ignore should be true.
     assigned =
         getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
     
     // Assign on 5 more nodes - to re-enable blacklisting
     for (int i = 0; i < 5; i++) {
-      nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+      nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
       assigned =
           getContainerOnHost(jobId, 11 + i, 1024,
               new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
-              dispatcher, allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
+              allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
       Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
     }
 
     // Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
     assigned =
         getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
-            nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+            nodeManagers[2], allocator, 0, 0, 0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
   }
 
-  private MockNM registerNodeManager(int i, MyResourceManager rm,
-      DrainDispatcher dispatcher) throws Exception {
+  private MockNM registerNodeManager(int i, MyResourceManager rm)
+      throws Exception {
     MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
     return nm;
   }
 
   private
       List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
           int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
-          DrainDispatcher dispatcher, MyContainerAllocator allocator,
+          MyContainerAllocator allocator,
           int expectedAdditions1, int expectedRemovals1,
           int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
           throws Exception {
@@ -1514,17 +1485,17 @@ public class TestRMContainerAllocator {
 
     // Send the request to the RM
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(
         expectedAdditions1, expectedRemovals1, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     // Heartbeat from the required nodeManager
     mockNM.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(
         expectedAdditions2, expectedRemovals2, rm);
     return assigned;
@@ -1542,21 +1513,19 @@ public class TestRMContainerAllocator {
     
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
     
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -1569,7 +1538,7 @@ public class TestRMContainerAllocator {
     // add resources to scheduler
     MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
     MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
-    dispatcher.await();
+    rm.drainEvents();
 
     LOG.info("Requesting 1 Containers _1 on H1");
     // create the container request
@@ -1581,17 +1550,17 @@ public class TestRMContainerAllocator {
     // this tells the scheduler about the requests
     // as nodes are not added, no allocations
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
     LOG.info("h1 Heartbeat (To actually schedule the containers)");
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
     
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 1", 1, assigned.size());    
     
@@ -1608,7 +1577,7 @@ public class TestRMContainerAllocator {
 
     //Update the Scheduler with the new requests.
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(1, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
 
@@ -1623,11 +1592,11 @@ public class TestRMContainerAllocator {
     LOG.info("h1 Heartbeat (To actually schedule the containers)");
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
     
     LOG.info("RM Heartbeat (To process the scheduled containers)");
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
     
@@ -1636,19 +1605,19 @@ public class TestRMContainerAllocator {
     //Send a release for the p:5 container + another request.
     LOG.info("RM Heartbeat (To process the re-scheduled containers)");
     assigned = allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
     
     //Hearbeat from H3 to schedule on this host.
     LOG.info("h3 Heartbeat (To re-schedule the containers)");
     nodeManager3.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm.drainEvents();
     
     LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
     assigned = allocator.schedule();
     assertBlacklistAdditionsAndRemovals(0, 0, rm);
-    dispatcher.await();
+    rm.drainEvents();
      
     // For debugging
     for (TaskAttemptContainerAssignedEvent assig : assigned) {
@@ -2229,22 +2198,20 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     final MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Make a node to register so as to launch the AM.
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job job = mock(Job.class);
@@ -2381,21 +2348,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     final MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher rmDispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp rmApp = rm.submitApp(1024);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264);
     amNodeManager.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     final ApplicationAttemptId appAttemptId =
         rmApp.getCurrentAppAttempt().getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    rmDispatcher.await();
+    rm.drainEvents();
 
     MRApp mrApp =
         new MRApp(appAttemptId, ContainerId.newContainerId(appAttemptId, 0), 10,
@@ -2454,22 +2419,20 @@ public class TestRMContainerAllocator {
 
     MyResourceManager rm1 = new MyResourceManager(conf, memStore);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -2498,7 +2461,7 @@ public class TestRMContainerAllocator {
     // send allocate request and 1 blacklisted nodes
     List<TaskAttemptContainerAssignedEvent> assignedContainers =
         allocator.schedule();
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0,
         assignedContainers.size());
     // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
@@ -2506,11 +2469,11 @@ public class TestRMContainerAllocator {
     assertBlacklistAdditionsAndRemovals(1, 0, rm1);
 
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     // Step-2 : 2 containers are allocated by RM.
     assignedContainers = allocator.schedule();
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 2", 2,
         assignedContainers.size());
     assertAsksAndReleases(0, 0, rm1);
@@ -2545,7 +2508,6 @@ public class TestRMContainerAllocator {
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     allocator.updateSchedulerProxy(rm2);
-    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
 
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -2555,7 +2517,7 @@ public class TestRMContainerAllocator {
     nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm2.drainEvents();
 
     // Step-4 : On RM restart, AM(does not know RM is restarted) sends
     // additional containerRequest(event4) and blacklisted nodes.
@@ -2576,7 +2538,7 @@ public class TestRMContainerAllocator {
 
     // send allocate request to 2nd RM and get resync command
     allocator.schedule();
-    dispatcher.await();
+    rm2.drainEvents();
 
     // Step-5 : On Resync,AM sends all outstanding
     // asks,release,blacklistAaddition
@@ -2587,16 +2549,16 @@ public class TestRMContainerAllocator {
 
     // send all outstanding request again.
     assignedContainers = allocator.schedule();
-    dispatcher.await();
+    rm2.drainEvents();
     assertAsksAndReleases(3, 2, rm2);
     assertBlacklistAdditionsAndRemovals(2, 0, rm2);
 
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm2.drainEvents();
 
     // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
     assignedContainers = allocator.schedule();
-    dispatcher.await();
+    rm2.drainEvents();
 
     Assert.assertEquals("Number of container should be 3", 3,
         assignedContainers.size());
@@ -2699,20 +2661,19 @@ public class TestRMContainerAllocator {
       MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
     MyResourceManager rm1 = new MyResourceManager(conf);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -2728,7 +2689,7 @@ public class TestRMContainerAllocator {
     } catch (RMContainerAllocationException e) {
       Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
     }
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("Should Have 1 Job Event", 1,
         allocator.jobEvents.size());
     JobEvent event = allocator.jobEvents.get(0); 
@@ -2750,22 +2711,20 @@ public class TestRMContainerAllocator {
     rm.start();
     AMRMTokenSecretManager secretMgr =
         rm.getRMContext().getAMRMTokenSecretManager();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     final ApplicationId appId = app.getApplicationId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     final Job mockJob = mock(Job.class);
@@ -2953,21 +2912,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
-        .getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -2989,21 +2946,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -3018,7 +2973,7 @@ public class TestRMContainerAllocator {
 
     // Register nodes to RM.
     MockNM nodeManager = rm.registerNode("h1:1234", 1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
@@ -3034,7 +2989,7 @@ public class TestRMContainerAllocator {
     // This will tell the scheduler about the requests but there will be no
     // allocations as nodes are not added.
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Advance clock so that maps can be considered as hanging.
     clock.setTime(System.currentTimeMillis() + 500000L);
@@ -3045,15 +3000,15 @@ public class TestRMContainerAllocator {
     allocator.sendRequest(event4);
 
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Update resources in scheduler through node heartbeat from h1.
     nodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // One map is assigned.
     Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
@@ -3087,7 +3042,7 @@ public class TestRMContainerAllocator {
     // On next allocate request to scheduler, headroom reported will be 0.
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     // After allocate response from scheduler, all scheduled reduces are ramped
     // down and move to pending. 3 asks are also updated with 0 containers to
     // indicate ramping down of reduces to scheduler.
@@ -3155,21 +3110,19 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -3184,7 +3137,7 @@ public class TestRMContainerAllocator {
 
     // Register nodes to RM.
     MockNM nodeManager = rm.registerNode("h1:1234", 1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
@@ -3200,7 +3153,7 @@ public class TestRMContainerAllocator {
     // This will tell the scheduler about the requests but there will be no
     // allocations as nodes are not added.
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Advance clock so that maps can be considered as hanging.
     clock.setTime(System.currentTimeMillis() + 500000L);
@@ -3211,15 +3164,15 @@ public class TestRMContainerAllocator {
     allocator.sendRequest(event4);
 
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Update resources in scheduler through node heartbeat from h1.
     nodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // One map is assigned.
     Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
@@ -3256,7 +3209,7 @@ public class TestRMContainerAllocator {
     // On next allocate request to scheduler, headroom reported will be 2048.
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     // After allocate response from scheduler, all scheduled reduces are ramped
     // down and move to pending. 3 asks are also updated with 0 containers to
     // indicate ramping down of reduces to scheduler.
@@ -3285,21 +3238,19 @@ public class TestRMContainerAllocator {
     conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1);
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm.submitApp(1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
     amNodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
         .getAppAttemptId();
     rm.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm.drainEvents();
 
     JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
     Job mockJob = mock(Job.class);
@@ -3315,10 +3266,10 @@ public class TestRMContainerAllocator {
         appAttemptId, mockJob);
 
     MockNM nodeManager = rm.registerNode("h1:1234", 4096);
-    dispatcher.await();
+    rm.drainEvents();
     // Register nodes to RM.
     MockNM nodeManager2 = rm.registerNode("h2:1234", 1024);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Request 2 maps and 1 reducer(sone on nodes which are not registered).
     ContainerRequestEvent event1 =
@@ -3334,7 +3285,7 @@ public class TestRMContainerAllocator {
     // This will tell the scheduler about the requests but there will be no
     // allocations as nodes are not added.
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Request for another reducer on h3 which has not registered.
     ContainerRequestEvent event4 =
@@ -3342,15 +3293,15 @@ public class TestRMContainerAllocator {
     allocator.sendRequest(event4);
 
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Update resources in scheduler through node heartbeat from h1.
     nodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // Two maps are assigned.
     Assert.assertEquals(2, allocator.getAssignedRequests().maps.size());
@@ -3363,15 +3314,15 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
 
     nodeManager.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
 
     // h2 heartbeats.
     nodeManager2.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Send request for one more mapper.
     ContainerRequestEvent event5 =
@@ -3380,7 +3331,7 @@ public class TestRMContainerAllocator {
 
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     // One reducer is assigned and one map is scheduled
     Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
     Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size());
@@ -3388,7 +3339,7 @@ public class TestRMContainerAllocator {
     // enough if scheduled reducers resources are deducted.
     rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2));
     allocator.schedule();
-    dispatcher.await();
+    rm.drainEvents();
     // After allocate response, the one assigned reducer is preempted and killed
     Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size());
     Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index 39a7633..fa3c6af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -126,22 +124,20 @@ public class TestAMRMClientOnRMRestart {
     // Phase-1 Start 1st RM
     MyResourceManager rm1 = new MyResourceManager(conf, memStore);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
         rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
@@ -176,7 +172,7 @@ public class TestAMRMClientOnRMRestart {
     blacklistAdditions.remove("h2");// remove from local list
 
     AllocateResponse allocateResponse = amClient.allocate(0.1f);
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
 
@@ -189,10 +185,10 @@ public class TestAMRMClientOnRMRestart {
     // Step-2 : NM heart beat is sent.
     // On 2nd AM allocate request, RM allocates 3 containers to AM
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     allocateResponse = amClient.allocate(0.2f);
-    dispatcher.await();
+    rm1.drainEvents();
     // 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3.
     Assert.assertEquals("No of assignments must be 0", 3, allocateResponse
         .getAllocatedContainers().size());
@@ -207,7 +203,7 @@ public class TestAMRMClientOnRMRestart {
     amClient.removeContainerRequest(cRequest3);
 
     allocateResponse = amClient.allocate(0.2f);
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
     assertAsksAndReleases(4, 0, rm1);
@@ -233,7 +229,7 @@ public class TestAMRMClientOnRMRestart {
     // request
     nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
         containerId.getContainerId(), ContainerState.RUNNING);
-    dispatcher.await();
+    rm1.drainEvents();
     amClient.requestContainerUpdate(
         container, UpdateContainerRequest.newInstance(
             container.getVersion(), container.getId(),
@@ -242,7 +238,7 @@ public class TestAMRMClientOnRMRestart {
     it.remove();
 
     allocateResponse = amClient.allocate(0.3f);
-    dispatcher.await();
+    rm1.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
     assertAsksAndReleases(3, pendingRelease, rm1);
@@ -258,7 +254,6 @@ public class TestAMRMClientOnRMRestart {
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
-    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
 
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -274,7 +269,7 @@ public class TestAMRMClientOnRMRestart {
         Collections.singletonList(
             containerId.getApplicationAttemptId().getApplicationId()));
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm2.drainEvents();
 
     blacklistAdditions.add("h3");
     amClient.updateBlacklist(blacklistAdditions, null);
@@ -296,7 +291,7 @@ public class TestAMRMClientOnRMRestart {
     // containerRequest and blacklisted nodes.
     // Intern RM send resync command,AMRMClient resend allocate request
     allocateResponse = amClient.allocate(0.3f);
-    dispatcher.await();
+    rm2.drainEvents();
 
     completedContainer =
         allocateResponse.getCompletedContainersStatuses().size();
@@ -313,7 +308,7 @@ public class TestAMRMClientOnRMRestart {
 
     // Step-5 : Allocater after resync command
     allocateResponse = amClient.allocate(0.5f);
-    dispatcher.await();
+    rm2.drainEvents();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
 
@@ -326,10 +321,10 @@ public class TestAMRMClientOnRMRestart {
     int count = 5;
     while (count-- > 0) {
       nm1.nodeHeartbeat(true);
-      dispatcher.await();
+      rm2.drainEvents();
 
       allocateResponse = amClient.allocate(0.5f);
-      dispatcher.await();
+      rm2.drainEvents();
       noAssignedContainer += allocateResponse.getAllocatedContainers().size();
       if (noAssignedContainer == 3) {
         break;
@@ -358,22 +353,20 @@ public class TestAMRMClientOnRMRestart {
     // Phase-1 Start 1st RM
     MyResourceManager rm1 = new MyResourceManager(conf, memStore);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
 
     // Submit the application
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
         rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
@@ -393,7 +386,6 @@ public class TestAMRMClientOnRMRestart {
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
-    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
 
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -409,7 +401,7 @@ public class TestAMRMClientOnRMRestart {
             Priority.newInstance(0), 0);
     nm1.registerNode(Arrays.asList(containerReport), null);
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm2.drainEvents();
 
     amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
         null, null);
@@ -421,7 +413,6 @@ public class TestAMRMClientOnRMRestart {
     amClient.stop();
     rm1.stop();
     rm2.stop();
-
   }
 
 
@@ -439,22 +430,20 @@ public class TestAMRMClientOnRMRestart {
     // start first RM
     MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore);
     rm1.start();
-    DrainDispatcher dispatcher =
-        (DrainDispatcher) rm1.getRMContext().getDispatcher();
     Long startTime = System.currentTimeMillis();
     // Submit the application
     RMApp app = rm1.submitApp(1024);
-    dispatcher.await();
+    rm1.drainEvents();
 
     MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
     nm1.nodeHeartbeat(true); // Node heartbeat
-    dispatcher.await();
+    rm1.drainEvents();
 
     ApplicationAttemptId appAttemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
     rm1.sendAMLaunched(appAttemptId);
-    dispatcher.await();
+    rm1.drainEvents();
 
     AMRMTokenSecretManager amrmTokenSecretManagerForRM1 =
         rm1.getRMContext().getAMRMTokenSecretManager();
@@ -513,7 +502,6 @@ public class TestAMRMClientOnRMRestart {
     rm2.start();
     nm1.setResourceTrackerService(rm2.getResourceTrackerService());
     ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
-    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
 
     AMRMTokenSecretManager amrmTokenSecretManagerForRM2 =
         rm2.getRMContext().getAMRMTokenSecretManager();
@@ -616,11 +604,6 @@ public class TestAMRMClientOnRMRestart {
     }
 
     @Override
-    protected Dispatcher createDispatcher() {
-      return new DrainDispatcher();
-    }
-
-    @Override
     protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
       // Dispatch inline for test sanity
       return new EventHandler<SchedulerEvent>() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
index fbd5ac3..100eb7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
@@ -30,14 +30,9 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.junit.Before;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public abstract class ACLsTestBase {
 
   protected static final String COMMON_USER = "common_user";
@@ -81,11 +76,6 @@ public abstract class ACLsTestBase {
       }
 
       @Override
-      protected Dispatcher createDispatcher() {
-        return new DrainDispatcher();
-      }
-
-      @Override
       protected void doSecureLogin() throws IOException {
       }
     };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index c9ce7d7..c95bcdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -26,22 +26,17 @@ import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -108,20 +103,9 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
   }
 
   protected void startRMs() throws IOException {
-    rm1 = new MockRM(confForRM1, null, false, false){
-      @Override
-      protected Dispatcher createDispatcher() {
-        return new DrainDispatcher();
-      }
-    };
-    rm2 = new MockRM(confForRM2, null, false, false){
-      @Override
-      protected Dispatcher createDispatcher() {
-        return new DrainDispatcher();
-      }
-    };
+    rm1 = new MockRM(confForRM1, null, false, false);
+    rm2 = new MockRM(confForRM2, null, false, false);
     startRMs(rm1, confForRM1, rm2, confForRM2);
-
   }
 
   protected void startRMsWithCustomizedRMAppManager() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
index 03bc889..c8ee00e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
 import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -463,9 +462,7 @@ public class ReservationACLsTestBase extends ACLsTestBase {
       int attempts = 10;
       Collection<Plan> plans;
       do {
-        DrainDispatcher dispatcher =
-                (DrainDispatcher) resourceManager.getRMContext().getDispatcher();
-        dispatcher.await();
+        resourceManager.drainEvents();
         LOG.info("Waiting for node capacity to be added to plan");
         plans = resourceManager.getRMContext().getReservationSystem()
                 .getAllPlans().values();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index c4197a1..422b7eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -161,13 +159,7 @@ public class TestApplicationCleanup {
 
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
-    final DrainDispatcher dispatcher = new DrainDispatcher();
-    MockRM rm = new MockRM() {
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
-    };
+    MockRM rm = new MockRM();
     rm.start();
 
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
@@ -185,8 +177,8 @@ public class TestApplicationCleanup {
     int request = 2;
     am.allocate("127.0.0.1" , 1000, request, 
         new ArrayList<ContainerId>());
-    dispatcher.await();
-    
+    rm.drainEvents();
+
     //kick the scheduler
     nm1.nodeHeartbeat(true);
     List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
@@ -199,7 +191,7 @@ public class TestApplicationCleanup {
       Thread.sleep(100);
       conts = am.allocate(new ArrayList<ResourceRequest>(),
           new ArrayList<ContainerId>()).getAllocatedContainers();
-      dispatcher.await();
+      rm.drainEvents();
       contReceived += conts.size();
       nm1.nodeHeartbeat(true);
     }
@@ -209,7 +201,7 @@ public class TestApplicationCleanup {
     ArrayList<ContainerId> release = new ArrayList<ContainerId>();
     release.add(conts.get(0).getId());
     am.allocate(new ArrayList<ResourceRequest>(), release);
-    dispatcher.await();
+    rm.drainEvents();
 
     // Send one more heartbeat with a fake running container. This is to
     // simulate the situation that can happen if the NM reports that container
@@ -224,7 +216,7 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
-    waitForContainerCleanup(dispatcher, nm1, resp);
+    waitForContainerCleanup(rm, nm1, resp);
 
     // Now to test the case when RM already gave cleanup, and NM suddenly
     // realizes that the container is running.
@@ -240,17 +232,17 @@ public class TestApplicationCleanup {
     resp = nm1.nodeHeartbeat(containerStatuses, true);
     // The cleanup list won't be instantaneous as it is given out by scheduler
     // and not RMNodeImpl.
-    waitForContainerCleanup(dispatcher, nm1, resp);
+    waitForContainerCleanup(rm, nm1, resp);
 
     rm.stop();
   }
 
-  protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+  protected void waitForContainerCleanup(MockRM rm, MockNM nm,
       NodeHeartbeatResponse resp) throws Exception {
     int waitCount = 0, cleanedConts = 0;
     List<ContainerId> contsToClean;
     do {
-      dispatcher.await();
+      rm.drainEvents();
       contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
       if (cleanedConts >= 1) {
@@ -400,13 +392,7 @@ public class TestApplicationCleanup {
     memStore.init(conf);
 
     // start RM
-    final DrainDispatcher dispatcher = new DrainDispatcher();
-    MockRM rm1 = new MockRM(conf, memStore) {
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
-    };
+    MockRM rm1 = new MockRM(conf, memStore);
     rm1.start();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@@ -419,13 +405,7 @@ public class TestApplicationCleanup {
     rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
 
     // start new RM
-    final DrainDispatcher dispatcher2 = new DrainDispatcher();
-    MockRM rm2 = new MockRM(conf, memStore) {
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher2;
-      }
-    };
+    MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
 
     // nm1 register to rm2, and do a heartbeat
@@ -437,7 +417,7 @@ public class TestApplicationCleanup {
     NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
         .getApplicationAttemptId(), 2, ContainerState.RUNNING);
 
-    waitForContainerCleanup(dispatcher2, nm1, response);
+    waitForContainerCleanup(rm2, nm1, response);
 
     rm1.stop();
     rm2.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 08b180f..9e84010 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -59,8 +59,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -260,7 +258,6 @@ public class TestApplicationMasterLauncher {
     Configuration conf = new Configuration();
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
     conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1);
-    final DrainDispatcher dispatcher = new DrainDispatcher();
     MockRM rm = new MockRMWithCustomAMLauncher(conf, null) {
       @Override
       protected ApplicationMasterLauncher createAMLauncher() {
@@ -284,12 +281,8 @@ public class TestApplicationMasterLauncher {
           }
         };
       }
-
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
     };
+
     rm.start();
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
 
@@ -297,7 +290,7 @@ public class TestApplicationMasterLauncher {
 
     // kick the scheduling
     nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
 
     MockRM.waitForState(app.getCurrentAppAttempt(),
       RMAppAttemptState.LAUNCHED, 500);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 23bed22..18c49bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -327,10 +325,8 @@ public class TestApplicationMasterService {
 
   @Test(timeout=1200000)
   public void  testAllocateAfterUnregister() throws Exception {
-    MyResourceManager rm = new MyResourceManager(conf);
+    MockRM rm = new MockRM(conf);
     rm.start();
-    DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
-            .getDispatcher();
     // Register node1
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
 
@@ -351,7 +347,7 @@ public class TestApplicationMasterService {
     AllocateResponse alloc1Response = am1.schedule();
 
     nm1.nodeHeartbeat(true);
-    rmDispatcher.await();
+    rm.drainEvents();
     alloc1Response = am1.schedule();
     Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size());
   }
@@ -474,17 +470,6 @@ public class TestApplicationMasterService {
     rm.stop();
   }
 
-  private static class MyResourceManager extends MockRM {
-
-    public MyResourceManager(YarnConfiguration conf) {
-      super(conf);
-    }
-    @Override
-    protected Dispatcher createDispatcher() {
-      return new DrainDispatcher();
-    }
-  }
-  
   private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
     RMContainer rmContainer = cs.getRMContainer(containerId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org