You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/04/03 20:57:58 UTC
[04/50] [abbrv] hadoop git commit: YARN-6411. Clean up the overwrite
of createDispatcher() in subclass of MockRM. Contributed by Yufei Gu
YARN-6411. Clean up the overwrite of createDispatcher() in subclass of MockRM. Contributed by Yufei Gu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d1fac5d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d1fac5d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d1fac5d
Branch: refs/heads/YARN-2915
Commit: 4d1fac5df2508011adfc4c5d683beb00fd5ced45
Parents: 28cdc5a
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Fri Mar 31 10:05:34 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Fri Mar 31 10:05:34 2017 -0500
----------------------------------------------------------------------
.../v2/app/rm/TestRMContainerAllocator.java | 453 +++++++++----------
.../api/impl/TestAMRMClientOnRMRestart.java | 59 +--
.../server/resourcemanager/ACLsTestBase.java | 10 -
.../server/resourcemanager/RMHATestBase.java | 20 +-
.../ReservationACLsTestBase.java | 5 +-
.../resourcemanager/TestApplicationCleanup.java | 44 +-
.../TestApplicationMasterLauncher.java | 11 +-
.../TestApplicationMasterService.java | 19 +-
.../TestNodeBlacklistingOnAMFailures.java | 41 +-
.../TestReservationSystemWithRMHA.java | 5 +-
.../TestAMRMRPCNodeUpdates.java | 18 +-
.../resourcetracker/TestNMReconnect.java | 14 +-
.../rmcontainer/TestRMContainerImpl.java | 1 -
.../capacity/TestApplicationPriority.java | 29 +-
.../security/TestClientToAMTokens.java | 23 +-
15 files changed, 277 insertions(+), 475 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index e6aee6e..933bd01 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -179,21 +179,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -207,7 +205,7 @@ public class TestRMContainerAllocator {
MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
- dispatcher.await();
+ rm.drainEvents();
// create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -222,7 +220,7 @@ public class TestRMContainerAllocator {
// this tells the scheduler about the requests
// as nodes are not added, no allocations
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
@@ -234,7 +232,7 @@ public class TestRMContainerAllocator {
// this tells the scheduler about the requests
// as nodes are not added, no allocations
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
@@ -242,18 +240,18 @@ public class TestRMContainerAllocator {
nodeManager1.nodeHeartbeat(true); // Node heartbeat
nodeManager2.nodeHeartbeat(true); // Node heartbeat
nodeManager3.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm.drainEvents();
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
assigned, false);
// check that the assigned container requests are cancelled
- assigned = allocator.schedule();
- dispatcher.await();
- Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());
+ allocator.schedule();
+ rm.drainEvents();
+ Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());
}
@Test
@@ -269,21 +267,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -297,7 +293,7 @@ public class TestRMContainerAllocator {
MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps
rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
- dispatcher.await();
+ rm.drainEvents();
// create the container requests for maps
ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -313,7 +309,7 @@ public class TestRMContainerAllocator {
// this tells the scheduler about the requests
// as nodes are not added, no allocations
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// update resources in scheduler
@@ -323,10 +319,10 @@ public class TestRMContainerAllocator {
// Node heartbeat from node-local next. This allocates 2 node local
// containers for task1 and task2. These should be matched with those tasks.
nodeManager1.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
assigned, false);
// remove the rack-local assignment that should have happened for task3
@@ -350,21 +346,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -378,7 +372,7 @@ public class TestRMContainerAllocator {
MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
- dispatcher.await();
+ rm.drainEvents();
// create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -393,17 +387,17 @@ public class TestRMContainerAllocator {
// this tells the scheduler about the requests
// as nodes are not added, no allocations
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
nodeManager2.nodeHeartbeat(true); // Node heartbeat
nodeManager3.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm.drainEvents();
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
checkAssignments(new ContainerRequestEvent[] { event1, event2 },
assigned, false);
}
@@ -416,19 +410,17 @@ public class TestRMContainerAllocator {
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
final MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
final RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
final String host = "host1";
final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048);
nm.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
final JobId jobId = MRBuilderUtils
.newJobId(appAttemptId.getApplicationId(), 0);
final Job mockJob = mock(Job.class);
@@ -438,20 +430,20 @@ public class TestRMContainerAllocator {
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, SystemClock.getInstance());
// add resources to scheduler
- dispatcher.await();
+ rm.drainEvents();
// create the container request
final String[] locations = new String[] { host };
allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true));
for (int i = 0; i < 1;) {
- dispatcher.await();
+ rm.drainEvents();
i += allocator.schedule().size();
nm.nodeHeartbeat(true);
}
allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false));
while (allocator.getTaskAttemptKillEvents().size() == 0) {
- dispatcher.await();
+ rm.drainEvents();
allocator.schedule().size();
nm.nodeHeartbeat(true);
}
@@ -468,21 +460,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -521,21 +511,19 @@ public class TestRMContainerAllocator {
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -584,21 +572,19 @@ public class TestRMContainerAllocator {
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8));
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -639,18 +625,16 @@ public class TestRMContainerAllocator {
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
final MyResourceManager2 rm = new MyResourceManager2(conf);
rm.start();
- final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext()
- .getDispatcher();
final RMApp app = rm.submitApp(2048);
- dispatcher.await();
+ rm.drainEvents();
final String host = "host1";
final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096);
nm.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
final JobId jobId = MRBuilderUtils
.newJobId(appAttemptId.getApplicationId(), 0);
final Job mockJob = mock(Job.class);
@@ -666,14 +650,14 @@ public class TestRMContainerAllocator {
allocator.scheduleAllReduces();
allocator.makeRemoteRequest();
nm.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false));
int assignedContainer;
for (assignedContainer = 0; assignedContainer < 1;) {
assignedContainer += allocator.schedule().size();
nm.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
}
// only 1 allocated container should be assigned
Assert.assertEquals(assignedContainer, 1);
@@ -773,21 +757,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -801,7 +783,7 @@ public class TestRMContainerAllocator {
MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
- dispatcher.await();
+ rm.drainEvents();
// create the container request
// send MAP request
@@ -822,17 +804,17 @@ public class TestRMContainerAllocator {
// this tells the scheduler about the requests
// as nodes are not added, no allocations
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
nodeManager2.nodeHeartbeat(true); // Node heartbeat
nodeManager3.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm.drainEvents();
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
checkAssignments(new ContainerRequestEvent[] { event1, event3 },
assigned, false);
@@ -864,11 +846,6 @@ public class TestRMContainerAllocator {
}
@Override
- protected Dispatcher createDispatcher() {
- return new DrainDispatcher();
- }
-
- @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
// Dispatch inline for test sanity
return new EventHandler<SchedulerEvent>() {
@@ -912,16 +889,16 @@ public class TestRMContainerAllocator {
// Submit the application
RMApp rmApp = rm.submitApp(1024);
- rmDispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 21504);
amNodeManager.nodeHeartbeat(true);
- rmDispatcher.await();
+ rm.drainEvents();
final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- rmDispatcher.await();
+ rm.drainEvents();
MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) {
@@ -959,11 +936,11 @@ public class TestRMContainerAllocator {
amDispatcher.await();
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
amNodeManager.nodeHeartbeat(true);
- rmDispatcher.await();
+ rm.drainEvents();
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
// Wait for all map-tasks to be running
for (Task t : job.getTasks().values()) {
@@ -973,7 +950,7 @@ public class TestRMContainerAllocator {
}
allocator.schedule(); // Send heartbeat
- rmDispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
@@ -981,14 +958,14 @@ public class TestRMContainerAllocator {
Iterator<Task> it = job.getTasks().values().iterator();
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f);
// Finish off 7 more so that map-progress is 80%
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7);
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f);
@@ -996,11 +973,11 @@ public class TestRMContainerAllocator {
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
amNodeManager.nodeHeartbeat(true);
- rmDispatcher.await();
+ rm.drainEvents();
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
// Wait for all reduce-tasks to be running
for (Task t : job.getTasks().values()) {
@@ -1013,14 +990,14 @@ public class TestRMContainerAllocator {
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
// Finish off the remaining 8 reduces.
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8);
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
// Remaining is JobCleanup
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
@@ -1064,16 +1041,16 @@ public class TestRMContainerAllocator {
// Submit the application
RMApp rmApp = rm.submitApp(1024);
- rmDispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
amNodeManager.nodeHeartbeat(true);
- rmDispatcher.await();
+ rm.drainEvents();
final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- rmDispatcher.await();
+ rm.drainEvents();
MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) {
@@ -1109,11 +1086,11 @@ public class TestRMContainerAllocator {
amDispatcher.await();
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
amNodeManager.nodeHeartbeat(true);
- rmDispatcher.await();
+ rm.drainEvents();
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
// Wait for all map-tasks to be running
for (Task t : job.getTasks().values()) {
@@ -1121,7 +1098,7 @@ public class TestRMContainerAllocator {
}
allocator.schedule(); // Send heartbeat
- rmDispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f);
@@ -1130,21 +1107,21 @@ public class TestRMContainerAllocator {
// Finish off 1 map so that map-progress is 10%
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f);
// Finish off 5 more map so that map-progress is 60%
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5);
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f);
// Finish off remaining map so that map-progress is 100%
finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4);
allocator.schedule();
- rmDispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f);
}
@@ -1154,21 +1131,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
-
+ rm.drainEvents();
+
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
@@ -1177,7 +1152,7 @@ public class TestRMContainerAllocator {
// add resources to scheduler
MockNM nm1 = rm.registerNode("h1:1234", 10240);
MockNM nm2 = rm.registerNode("h2:1234", 10240);
- dispatcher.await();
+ rm.drainEvents();
// create the map container request
ContainerRequestEvent event = createReq(jobId, 1, 1024,
@@ -1193,16 +1168,16 @@ public class TestRMContainerAllocator {
// this tells the scheduler about the requests
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
Assert.assertEquals(3, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
allocator.getJobUpdatedNodeEvents().clear();
// get the assignment
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(1, assigned.size());
Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId());
// no updated nodes reported
@@ -1212,11 +1187,11 @@ public class TestRMContainerAllocator {
// mark nodes bad
nm1.nodeHeartbeat(false);
nm2.nodeHeartbeat(false);
- dispatcher.await();
-
+ rm.drainEvents();
+
// schedule response returns updated nodes
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0, assigned.size());
// updated nodes are reported
Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
@@ -1227,7 +1202,7 @@ public class TestRMContainerAllocator {
allocator.getTaskAttemptKillEvents().clear();
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals(0, assigned.size());
// no updated nodes reported
Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty());
@@ -1247,21 +1222,19 @@ public class TestRMContainerAllocator {
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -1275,7 +1248,7 @@ public class TestRMContainerAllocator {
MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
- dispatcher.await();
+ rm.drainEvents();
// create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
@@ -1295,7 +1268,7 @@ public class TestRMContainerAllocator {
// this tells the scheduler about the requests
// as nodes are not added, no allocations
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Send events to blacklist nodes h1 and h2
@@ -1307,28 +1280,28 @@ public class TestRMContainerAllocator {
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
nodeManager2.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm.drainEvents();
assigned = allocator.schedule();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
assertBlacklistAdditionsAndRemovals(2, 0, rm);
// mark h1/h2 as bad nodes
nodeManager1.nodeHeartbeat(false);
nodeManager2.nodeHeartbeat(false);
- dispatcher.await();
+ rm.drainEvents();
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
nodeManager3.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm.drainEvents();
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
@@ -1352,24 +1325,22 @@ public class TestRMContainerAllocator {
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM[] nodeManagers = new MockNM[10];
int nmNum = 0;
List<TaskAttemptContainerAssignedEvent> assigned = null;
- nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
nodeManagers[0].nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -1382,7 +1353,7 @@ public class TestRMContainerAllocator {
// Known=1, blacklisted=0, ignore should be false - assign first container
assigned =
getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
- nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
@@ -1397,47 +1368,47 @@ public class TestRMContainerAllocator {
// The current call will send blacklisted node "h1" to RM
assigned =
getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
- nodeManagers[0], dispatcher, allocator, 1, 0, 0, 1, rm);
+ nodeManagers[0], allocator, 1, 0, 0, 1, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Known=1, blacklisted=1, ignore should be true - assign 1
assigned =
getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
- nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
- nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
// Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
assigned =
getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
- nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[1], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
- nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
assigned =
getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
- nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[2], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Known=3, blacklisted=1, ignore should be true - assign 1
assigned =
getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
- nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
- nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway
assigned =
getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
- nodeManagers[3], dispatcher, allocator, 0, 0, 1, 0, rm);
+ nodeManagers[3], allocator, 0, 0, 1, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Test blacklisting re-enabled.
// Known=4, blacklisted=1, ignore should be false - no assignment on h1
assigned =
getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
- nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// RMContainerRequestor would have created a replacement request.
@@ -1450,61 +1421,61 @@ public class TestRMContainerAllocator {
// container for the same reason above.
assigned =
getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
- nodeManagers[0], dispatcher, allocator, 1, 0, 0, 2, rm);
+ nodeManagers[0], allocator, 1, 0, 0, 2, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Known=4, blacklisted=2, ignore should be true. Should assign 2
// containers.
assigned =
getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
- nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[0], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
// Known=4, blacklisted=2, ignore should be true.
assigned =
getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
- nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[1], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Test blacklist while ignore blacklisting enabled
ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
allocator.sendFailure(f3);
- nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
// Known=5, blacklisted=3, ignore should be true.
assigned =
getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
- nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[2], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
// Assign on 5 more nodes - to re-enable blacklisting
for (int i = 0; i < 5; i++) {
- nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm);
assigned =
getContainerOnHost(jobId, 11 + i, 1024,
new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
- dispatcher, allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
+ allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
}
// Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
assigned =
getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
- nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
+ nodeManagers[2], allocator, 0, 0, 0, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
}
- private MockNM registerNodeManager(int i, MyResourceManager rm,
- DrainDispatcher dispatcher) throws Exception {
+ private MockNM registerNodeManager(int i, MyResourceManager rm)
+ throws Exception {
MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
- dispatcher.await();
+ rm.drainEvents();
return nm;
}
private
List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
- DrainDispatcher dispatcher, MyContainerAllocator allocator,
+ MyContainerAllocator allocator,
int expectedAdditions1, int expectedRemovals1,
int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
throws Exception {
@@ -1514,17 +1485,17 @@ public class TestRMContainerAllocator {
// Send the request to the RM
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
assertBlacklistAdditionsAndRemovals(
expectedAdditions1, expectedRemovals1, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// Heartbeat from the required nodeManager
mockNM.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
assertBlacklistAdditionsAndRemovals(
expectedAdditions2, expectedRemovals2, rm);
return assigned;
@@ -1542,21 +1513,19 @@ public class TestRMContainerAllocator {
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -1569,7 +1538,7 @@ public class TestRMContainerAllocator {
// add resources to scheduler
MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
- dispatcher.await();
+ rm.drainEvents();
LOG.info("Requesting 1 Containers _1 on H1");
// create the container request
@@ -1581,17 +1550,17 @@ public class TestRMContainerAllocator {
// this tells the scheduler about the requests
// as nodes are not added, no allocations
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
LOG.info("h1 Heartbeat (To actually schedule the containers)");
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm.drainEvents();
LOG.info("RM Heartbeat (To process the scheduled containers)");
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
@@ -1608,7 +1577,7 @@ public class TestRMContainerAllocator {
//Update the Scheduler with the new requests.
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
assertBlacklistAdditionsAndRemovals(1, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
@@ -1623,11 +1592,11 @@ public class TestRMContainerAllocator {
LOG.info("h1 Heartbeat (To actually schedule the containers)");
// update resources in scheduler
nodeManager1.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm.drainEvents();
LOG.info("RM Heartbeat (To process the scheduled containers)");
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
@@ -1636,19 +1605,19 @@ public class TestRMContainerAllocator {
//Send a release for the p:5 container + another request.
LOG.info("RM Heartbeat (To process the re-scheduled containers)");
assigned = allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
//Hearbeat from H3 to schedule on this host.
LOG.info("h3 Heartbeat (To re-schedule the containers)");
nodeManager3.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm.drainEvents();
LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
assigned = allocator.schedule();
assertBlacklistAdditionsAndRemovals(0, 0, rm);
- dispatcher.await();
+ rm.drainEvents();
// For debugging
for (TaskAttemptContainerAssignedEvent assig : assigned) {
@@ -2229,22 +2198,20 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
final MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
// Make a node to register so as to launch the AM.
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job job = mock(Job.class);
@@ -2381,21 +2348,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
final MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher rmDispatcher =
- (DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp rmApp = rm.submitApp(1024);
- rmDispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264);
amNodeManager.nodeHeartbeat(true);
- rmDispatcher.await();
+ rm.drainEvents();
final ApplicationAttemptId appAttemptId =
rmApp.getCurrentAppAttempt().getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- rmDispatcher.await();
+ rm.drainEvents();
MRApp mrApp =
new MRApp(appAttemptId, ContainerId.newContainerId(appAttemptId, 0), 10,
@@ -2454,22 +2419,20 @@ public class TestRMContainerAllocator {
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
rm1.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm1.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm1.submitApp(1024);
- dispatcher.await();
+ rm1.drainEvents();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm1.drainEvents();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm1.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -2498,7 +2461,7 @@ public class TestRMContainerAllocator {
// send allocate request and 1 blacklisted nodes
List<TaskAttemptContainerAssignedEvent> assignedContainers =
allocator.schedule();
- dispatcher.await();
+ rm1.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0,
assignedContainers.size());
// Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
@@ -2506,11 +2469,11 @@ public class TestRMContainerAllocator {
assertBlacklistAdditionsAndRemovals(1, 0, rm1);
nm1.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm1.drainEvents();
// Step-2 : 2 containers are allocated by RM.
assignedContainers = allocator.schedule();
- dispatcher.await();
+ rm1.drainEvents();
Assert.assertEquals("No of assignments must be 2", 2,
assignedContainers.size());
assertAsksAndReleases(0, 0, rm1);
@@ -2545,7 +2508,6 @@ public class TestRMContainerAllocator {
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
allocator.updateSchedulerProxy(rm2);
- dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -2555,7 +2517,7 @@ public class TestRMContainerAllocator {
nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm2.drainEvents();
// Step-4 : On RM restart, AM(does not know RM is restarted) sends
// additional containerRequest(event4) and blacklisted nodes.
@@ -2576,7 +2538,7 @@ public class TestRMContainerAllocator {
// send allocate request to 2nd RM and get resync command
allocator.schedule();
- dispatcher.await();
+ rm2.drainEvents();
// Step-5 : On Resync,AM sends all outstanding
// asks,release,blacklistAaddition
@@ -2587,16 +2549,16 @@ public class TestRMContainerAllocator {
// send all outstanding request again.
assignedContainers = allocator.schedule();
- dispatcher.await();
+ rm2.drainEvents();
assertAsksAndReleases(3, 2, rm2);
assertBlacklistAdditionsAndRemovals(2, 0, rm2);
nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm2.drainEvents();
// Step-6 : RM allocates containers i.e event3,event4 and cRequest5
assignedContainers = allocator.schedule();
- dispatcher.await();
+ rm2.drainEvents();
Assert.assertEquals("Number of container should be 3", 3,
assignedContainers.size());
@@ -2699,20 +2661,19 @@ public class TestRMContainerAllocator {
MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
MyResourceManager rm1 = new MyResourceManager(conf);
rm1.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
RMApp app = rm1.submitApp(1024);
- dispatcher.await();
+ rm1.drainEvents();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm1.drainEvents();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm1.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -2728,7 +2689,7 @@ public class TestRMContainerAllocator {
} catch (RMContainerAllocationException e) {
Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
}
- dispatcher.await();
+ rm1.drainEvents();
Assert.assertEquals("Should Have 1 Job Event", 1,
allocator.jobEvents.size());
JobEvent event = allocator.jobEvents.get(0);
@@ -2750,22 +2711,20 @@ public class TestRMContainerAllocator {
rm.start();
AMRMTokenSecretManager secretMgr =
rm.getRMContext().getAMRMTokenSecretManager();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
final ApplicationId appId = app.getApplicationId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
final Job mockJob = mock(Job.class);
@@ -2953,21 +2912,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -2989,21 +2946,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -3018,7 +2973,7 @@ public class TestRMContainerAllocator {
// Register nodes to RM.
MockNM nodeManager = rm.registerNode("h1:1234", 1024);
- dispatcher.await();
+ rm.drainEvents();
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
@@ -3034,7 +2989,7 @@ public class TestRMContainerAllocator {
// This will tell the scheduler about the requests but there will be no
// allocations as nodes are not added.
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// Advance clock so that maps can be considered as hanging.
clock.setTime(System.currentTimeMillis() + 500000L);
@@ -3045,15 +3000,15 @@ public class TestRMContainerAllocator {
allocator.sendRequest(event4);
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// Update resources in scheduler through node heartbeat from h1.
nodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// One map is assigned.
Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
@@ -3087,7 +3042,7 @@ public class TestRMContainerAllocator {
// On next allocate request to scheduler, headroom reported will be 0.
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0));
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// After allocate response from scheduler, all scheduled reduces are ramped
// down and move to pending. 3 asks are also updated with 0 containers to
// indicate ramping down of reduces to scheduler.
@@ -3155,21 +3110,19 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -3184,7 +3137,7 @@ public class TestRMContainerAllocator {
// Register nodes to RM.
MockNM nodeManager = rm.registerNode("h1:1234", 1024);
- dispatcher.await();
+ rm.drainEvents();
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
@@ -3200,7 +3153,7 @@ public class TestRMContainerAllocator {
// This will tell the scheduler about the requests but there will be no
// allocations as nodes are not added.
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// Advance clock so that maps can be considered as hanging.
clock.setTime(System.currentTimeMillis() + 500000L);
@@ -3211,15 +3164,15 @@ public class TestRMContainerAllocator {
allocator.sendRequest(event4);
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// Update resources in scheduler through node heartbeat from h1.
nodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// One map is assigned.
Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
@@ -3256,7 +3209,7 @@ public class TestRMContainerAllocator {
// On next allocate request to scheduler, headroom reported will be 2048.
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0));
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// After allocate response from scheduler, all scheduled reduces are ramped
// down and move to pending. 3 asks are also updated with 0 containers to
// indicate ramping down of reduces to scheduler.
@@ -3285,21 +3238,19 @@ public class TestRMContainerAllocator {
conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
- dispatcher.await();
+ rm.drainEvents();
MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
amNodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm.drainEvents();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
@@ -3315,10 +3266,10 @@ public class TestRMContainerAllocator {
appAttemptId, mockJob);
MockNM nodeManager = rm.registerNode("h1:1234", 4096);
- dispatcher.await();
+ rm.drainEvents();
// Register nodes to RM.
MockNM nodeManager2 = rm.registerNode("h2:1234", 1024);
- dispatcher.await();
+ rm.drainEvents();
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
@@ -3334,7 +3285,7 @@ public class TestRMContainerAllocator {
// This will tell the scheduler about the requests but there will be no
// allocations as nodes are not added.
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 =
@@ -3342,15 +3293,15 @@ public class TestRMContainerAllocator {
allocator.sendRequest(event4);
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// Update resources in scheduler through node heartbeat from h1.
nodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3));
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// Two maps are assigned.
Assert.assertEquals(2, allocator.getAssignedRequests().maps.size());
@@ -3363,15 +3314,15 @@ public class TestRMContainerAllocator {
Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
nodeManager.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// h2 heartbeats.
nodeManager2.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
// Send request for one more mapper.
ContainerRequestEvent event5 =
@@ -3380,7 +3331,7 @@ public class TestRMContainerAllocator {
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// One reducer is assigned and one map is scheduled
Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size());
@@ -3388,7 +3339,7 @@ public class TestRMContainerAllocator {
// enough if scheduled reducers resources are deducted.
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2));
allocator.schedule();
- dispatcher.await();
+ rm.drainEvents();
// After allocate response, the one assigned reducer is preempted and killed
Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size());
Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index 39a7633..fa3c6af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -126,22 +124,20 @@ public class TestAMRMClientOnRMRestart {
// Phase-1 Start 1st RM
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
rm1.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm1.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm1.submitApp(1024);
- dispatcher.await();
+ rm1.drainEvents();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm1.drainEvents();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm1.drainEvents();
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
@@ -176,7 +172,7 @@ public class TestAMRMClientOnRMRestart {
blacklistAdditions.remove("h2");// remove from local list
AllocateResponse allocateResponse = amClient.allocate(0.1f);
- dispatcher.await();
+ rm1.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
.getAllocatedContainers().size());
@@ -189,10 +185,10 @@ public class TestAMRMClientOnRMRestart {
// Step-2 : NM heart beat is sent.
// On 2nd AM allocate request, RM allocates 3 containers to AM
nm1.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm1.drainEvents();
allocateResponse = amClient.allocate(0.2f);
- dispatcher.await();
+ rm1.drainEvents();
// 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3.
Assert.assertEquals("No of assignments must be 0", 3, allocateResponse
.getAllocatedContainers().size());
@@ -207,7 +203,7 @@ public class TestAMRMClientOnRMRestart {
amClient.removeContainerRequest(cRequest3);
allocateResponse = amClient.allocate(0.2f);
- dispatcher.await();
+ rm1.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
.getAllocatedContainers().size());
assertAsksAndReleases(4, 0, rm1);
@@ -233,7 +229,7 @@ public class TestAMRMClientOnRMRestart {
// request
nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
containerId.getContainerId(), ContainerState.RUNNING);
- dispatcher.await();
+ rm1.drainEvents();
amClient.requestContainerUpdate(
container, UpdateContainerRequest.newInstance(
container.getVersion(), container.getId(),
@@ -242,7 +238,7 @@ public class TestAMRMClientOnRMRestart {
it.remove();
allocateResponse = amClient.allocate(0.3f);
- dispatcher.await();
+ rm1.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
.getAllocatedContainers().size());
assertAsksAndReleases(3, pendingRelease, rm1);
@@ -258,7 +254,6 @@ public class TestAMRMClientOnRMRestart {
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
- dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -274,7 +269,7 @@ public class TestAMRMClientOnRMRestart {
Collections.singletonList(
containerId.getApplicationAttemptId().getApplicationId()));
nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm2.drainEvents();
blacklistAdditions.add("h3");
amClient.updateBlacklist(blacklistAdditions, null);
@@ -296,7 +291,7 @@ public class TestAMRMClientOnRMRestart {
// containerRequest and blacklisted nodes.
// Intern RM send resync command,AMRMClient resend allocate request
allocateResponse = amClient.allocate(0.3f);
- dispatcher.await();
+ rm2.drainEvents();
completedContainer =
allocateResponse.getCompletedContainersStatuses().size();
@@ -313,7 +308,7 @@ public class TestAMRMClientOnRMRestart {
// Step-5 : Allocater after resync command
allocateResponse = amClient.allocate(0.5f);
- dispatcher.await();
+ rm2.drainEvents();
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
.getAllocatedContainers().size());
@@ -326,10 +321,10 @@ public class TestAMRMClientOnRMRestart {
int count = 5;
while (count-- > 0) {
nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm2.drainEvents();
allocateResponse = amClient.allocate(0.5f);
- dispatcher.await();
+ rm2.drainEvents();
noAssignedContainer += allocateResponse.getAllocatedContainers().size();
if (noAssignedContainer == 3) {
break;
@@ -358,22 +353,20 @@ public class TestAMRMClientOnRMRestart {
// Phase-1 Start 1st RM
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
rm1.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm1.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm1.submitApp(1024);
- dispatcher.await();
+ rm1.drainEvents();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm1.drainEvents();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm1.drainEvents();
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
@@ -393,7 +386,6 @@ public class TestAMRMClientOnRMRestart {
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
- dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@@ -409,7 +401,7 @@ public class TestAMRMClientOnRMRestart {
Priority.newInstance(0), 0);
nm1.registerNode(Arrays.asList(containerReport), null);
nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm2.drainEvents();
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
@@ -421,7 +413,6 @@ public class TestAMRMClientOnRMRestart {
amClient.stop();
rm1.stop();
rm2.stop();
-
}
@@ -439,22 +430,20 @@ public class TestAMRMClientOnRMRestart {
// start first RM
MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore);
rm1.start();
- DrainDispatcher dispatcher =
- (DrainDispatcher) rm1.getRMContext().getDispatcher();
Long startTime = System.currentTimeMillis();
// Submit the application
RMApp app = rm1.submitApp(1024);
- dispatcher.await();
+ rm1.drainEvents();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(true); // Node heartbeat
- dispatcher.await();
+ rm1.drainEvents();
ApplicationAttemptId appAttemptId =
app.getCurrentAppAttempt().getAppAttemptId();
rm1.sendAMLaunched(appAttemptId);
- dispatcher.await();
+ rm1.drainEvents();
AMRMTokenSecretManager amrmTokenSecretManagerForRM1 =
rm1.getRMContext().getAMRMTokenSecretManager();
@@ -513,7 +502,6 @@ public class TestAMRMClientOnRMRestart {
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
- dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
AMRMTokenSecretManager amrmTokenSecretManagerForRM2 =
rm2.getRMContext().getAMRMTokenSecretManager();
@@ -616,11 +604,6 @@ public class TestAMRMClientOnRMRestart {
}
@Override
- protected Dispatcher createDispatcher() {
- return new DrainDispatcher();
- }
-
- @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
// Dispatch inline for test sanity
return new EventHandler<SchedulerEvent>() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
index fbd5ac3..100eb7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java
@@ -30,14 +30,9 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.junit.Before;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public abstract class ACLsTestBase {
protected static final String COMMON_USER = "common_user";
@@ -81,11 +76,6 @@ public abstract class ACLsTestBase {
}
@Override
- protected Dispatcher createDispatcher() {
- return new DrainDispatcher();
- }
-
- @Override
protected void doSecureLogin() throws IOException {
}
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index c9ce7d7..c95bcdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -26,22 +26,17 @@ import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -108,20 +103,9 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
}
protected void startRMs() throws IOException {
- rm1 = new MockRM(confForRM1, null, false, false){
- @Override
- protected Dispatcher createDispatcher() {
- return new DrainDispatcher();
- }
- };
- rm2 = new MockRM(confForRM2, null, false, false){
- @Override
- protected Dispatcher createDispatcher() {
- return new DrainDispatcher();
- }
- };
+ rm1 = new MockRM(confForRM1, null, false, false);
+ rm2 = new MockRM(confForRM2, null, false, false);
startRMs(rm1, confForRM1, rm2, confForRM2);
-
}
protected void startRMsWithCustomizedRMAppManager() throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
index 03bc889..c8ee00e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -463,9 +462,7 @@ public class ReservationACLsTestBase extends ACLsTestBase {
int attempts = 10;
Collection<Plan> plans;
do {
- DrainDispatcher dispatcher =
- (DrainDispatcher) resourceManager.getRMContext().getDispatcher();
- dispatcher.await();
+ resourceManager.drainEvents();
LOG.info("Waiting for node capacity to be added to plan");
plans = resourceManager.getRMContext().getReservationSystem()
.getAllPlans().values();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index c4197a1..422b7eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -161,13 +159,7 @@ public class TestApplicationCleanup {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
- final DrainDispatcher dispatcher = new DrainDispatcher();
- MockRM rm = new MockRM() {
- @Override
- protected Dispatcher createDispatcher() {
- return dispatcher;
- }
- };
+ MockRM rm = new MockRM();
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
@@ -185,8 +177,8 @@ public class TestApplicationCleanup {
int request = 2;
am.allocate("127.0.0.1" , 1000, request,
new ArrayList<ContainerId>());
- dispatcher.await();
-
+ rm.drainEvents();
+
//kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
@@ -199,7 +191,7 @@ public class TestApplicationCleanup {
Thread.sleep(100);
conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
- dispatcher.await();
+ rm.drainEvents();
contReceived += conts.size();
nm1.nodeHeartbeat(true);
}
@@ -209,7 +201,7 @@ public class TestApplicationCleanup {
ArrayList<ContainerId> release = new ArrayList<ContainerId>();
release.add(conts.get(0).getId());
am.allocate(new ArrayList<ResourceRequest>(), release);
- dispatcher.await();
+ rm.drainEvents();
// Send one more heartbeat with a fake running container. This is to
// simulate the situation that can happen if the NM reports that container
@@ -224,7 +216,7 @@ public class TestApplicationCleanup {
containerStatuses.put(app.getApplicationId(), containerStatusList);
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
- waitForContainerCleanup(dispatcher, nm1, resp);
+ waitForContainerCleanup(rm, nm1, resp);
// Now to test the case when RM already gave cleanup, and NM suddenly
// realizes that the container is running.
@@ -240,17 +232,17 @@ public class TestApplicationCleanup {
resp = nm1.nodeHeartbeat(containerStatuses, true);
// The cleanup list won't be instantaneous as it is given out by scheduler
// and not RMNodeImpl.
- waitForContainerCleanup(dispatcher, nm1, resp);
+ waitForContainerCleanup(rm, nm1, resp);
rm.stop();
}
- protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+ protected void waitForContainerCleanup(MockRM rm, MockNM nm,
NodeHeartbeatResponse resp) throws Exception {
int waitCount = 0, cleanedConts = 0;
List<ContainerId> contsToClean;
do {
- dispatcher.await();
+ rm.drainEvents();
contsToClean = resp.getContainersToCleanup();
cleanedConts += contsToClean.size();
if (cleanedConts >= 1) {
@@ -400,13 +392,7 @@ public class TestApplicationCleanup {
memStore.init(conf);
// start RM
- final DrainDispatcher dispatcher = new DrainDispatcher();
- MockRM rm1 = new MockRM(conf, memStore) {
- @Override
- protected Dispatcher createDispatcher() {
- return dispatcher;
- }
- };
+ MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
@@ -419,13 +405,7 @@ public class TestApplicationCleanup {
rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
// start new RM
- final DrainDispatcher dispatcher2 = new DrainDispatcher();
- MockRM rm2 = new MockRM(conf, memStore) {
- @Override
- protected Dispatcher createDispatcher() {
- return dispatcher2;
- }
- };
+ MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
// nm1 register to rm2, and do a heartbeat
@@ -437,7 +417,7 @@ public class TestApplicationCleanup {
NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
.getApplicationAttemptId(), 2, ContainerState.RUNNING);
- waitForContainerCleanup(dispatcher2, nm1, response);
+ waitForContainerCleanup(rm2, nm1, response);
rm1.stop();
rm2.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
index 08b180f..9e84010 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
@@ -59,8 +59,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -260,7 +258,6 @@ public class TestApplicationMasterLauncher {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1);
- final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm = new MockRMWithCustomAMLauncher(conf, null) {
@Override
protected ApplicationMasterLauncher createAMLauncher() {
@@ -284,12 +281,8 @@ public class TestApplicationMasterLauncher {
}
};
}
-
- @Override
- protected Dispatcher createDispatcher() {
- return dispatcher;
- }
};
+
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
@@ -297,7 +290,7 @@ public class TestApplicationMasterLauncher {
// kick the scheduling
nm1.nodeHeartbeat(true);
- dispatcher.await();
+ rm.drainEvents();
MockRM.waitForState(app.getCurrentAppAttempt(),
RMAppAttemptState.LAUNCHED, 500);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d1fac5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 23bed22..18c49bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -327,10 +325,8 @@ public class TestApplicationMasterService {
@Test(timeout=1200000)
public void testAllocateAfterUnregister() throws Exception {
- MyResourceManager rm = new MyResourceManager(conf);
+ MockRM rm = new MockRM(conf);
rm.start();
- DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
- .getDispatcher();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
@@ -351,7 +347,7 @@ public class TestApplicationMasterService {
AllocateResponse alloc1Response = am1.schedule();
nm1.nodeHeartbeat(true);
- rmDispatcher.await();
+ rm.drainEvents();
alloc1Response = am1.schedule();
Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size());
}
@@ -474,17 +470,6 @@ public class TestApplicationMasterService {
rm.stop();
}
- private static class MyResourceManager extends MockRM {
-
- public MyResourceManager(YarnConfiguration conf) {
- super(conf);
- }
- @Override
- protected Dispatcher createDispatcher() {
- return new DrainDispatcher();
- }
- }
-
private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMContainer rmContainer = cs.getRMContainer(containerId);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org