You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ji...@apache.org on 2014/07/17 21:01:27 UTC
svn commit: r1611436 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Author: jianhe
Date: Thu Jul 17 19:01:27 2014
New Revision: 1611436
URL: http://svn.apache.org/r1611436
Log:
Merge r1611434 from trunk. MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving RM-restart. Contributed by Rohith
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1611436&r1=1611435&r2=1611436&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Jul 17 19:01:27 2014
@@ -5,6 +5,8 @@ Release 2.6.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
+ MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving
+ RM-restart. Contributed by Rohith
IMPROVEMENTS
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1611436&r1=1611435&r2=1611436&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Thu Jul 17 19:01:27 2014
@@ -64,6 +64,7 @@ public class LocalContainerAllocator ext
private int nmPort;
private int nmHttpPort;
private ContainerId containerId;
+ protected int lastResponseID;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -119,6 +120,11 @@ public class LocalContainerAllocator ext
if (allocateResponse.getAMCommand() != null) {
switch(allocateResponse.getAMCommand()) {
case AM_RESYNC:
+ LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ this.lastResponseID = 0;
+ register();
+ break;
case AM_SHUTDOWN:
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1611436&r1=1611435&r2=1611436&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Thu Jul 17 19:01:27 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -216,20 +217,27 @@ public abstract class RMCommunicator ext
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState,
sb.toString(), historyUrl);
- while (true) {
- FinishApplicationMasterResponse response =
- scheduler.finishApplicationMaster(request);
- if (response.getIsUnregistered()) {
- // When excepting ClientService, other services are already stopped,
- // it is safe to let clients know the final states. ClientService
- // should wait for some time so clients have enough time to know the
- // final states.
- RunningAppContext raContext = (RunningAppContext) context;
- raContext.markSuccessfulUnregistration();
- break;
+ try {
+ while (true) {
+ FinishApplicationMasterResponse response =
+ scheduler.finishApplicationMaster(request);
+ if (response.getIsUnregistered()) {
+ // When excepting ClientService, other services are already stopped,
+ // it is safe to let clients know the final states. ClientService
+ // should wait for some time so clients have enough time to know the
+ // final states.
+ RunningAppContext raContext = (RunningAppContext) context;
+ raContext.markSuccessfulUnregistration();
+ break;
+ }
+ LOG.info("Waiting for application to be successfully unregistered.");
+ Thread.sleep(rmPollInterval);
}
- LOG.info("Waiting for application to be successfully unregistered.");
- Thread.sleep(rmPollInterval);
+ } catch (ApplicationMasterNotRegisteredException e) {
+ // RM might have restarted or failed over and so lost the fact that AM had
+ // registered before.
+ register();
+ doUnregistration();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1611436&r1=1611435&r2=1611436&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jul 17 19:01:27 2014
@@ -383,6 +383,7 @@ public class RMContainerAllocator extend
removed = true;
assignedRequests.remove(aId);
containersReleased++;
+ pendingRelease.add(containerId);
release(containerId);
}
}
@@ -631,6 +632,15 @@ public class RMContainerAllocator extend
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
case AM_RESYNC:
+ LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ lastResponseID = 0;
+
+ // Registering to allow RM to discover an active AM for this
+ // application
+ register();
+ addOutstandingRequestOnResync();
+ break;
case AM_SHUTDOWN:
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
@@ -682,6 +692,7 @@ public class RMContainerAllocator extend
LOG.error("Container complete event for unknown container id "
+ cont.getContainerId());
} else {
+ pendingRelease.remove(cont.getContainerId());
assignedRequests.remove(attemptID);
// send the container completed event to Task attempt
@@ -971,6 +982,7 @@ public class RMContainerAllocator extend
private void containerNotAssigned(Container allocated) {
containersReleased++;
+ pendingRelease.add(allocated.getId());
release(allocated.getId());
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1611436&r1=1611435&r2=1611436&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu Jul 17 19:01:27 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,7 +59,7 @@ public abstract class RMContainerRequest
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
- private int lastResponseID;
+ protected int lastResponseID;
private Resource availableResources;
private final RecordFactory recordFactory =
@@ -77,8 +78,11 @@ public abstract class RMContainerRequest
// numContainers dont end up as duplicates
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
- private final Set<ContainerId> release = new TreeSet<ContainerId>();
-
+ private final Set<ContainerId> release = new TreeSet<ContainerId>();
+ // pendingRelease holds history or release requests.request is removed only if
+ // RM sends completedContainer.
+ // How it different from release? --> release is for per allocate() request.
+ protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@@ -186,6 +190,10 @@ public abstract class RMContainerRequest
} catch (YarnException e) {
throw new IOException(e);
}
+
+ if (isResyncCommand(allocateResponse)) {
+ return allocateResponse;
+ }
lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;
@@ -214,6 +222,28 @@ public abstract class RMContainerRequest
return allocateResponse;
}
+ protected boolean isResyncCommand(AllocateResponse allocateResponse) {
+ return allocateResponse.getAMCommand() != null
+ && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+ }
+
+ protected void addOutstandingRequestOnResync() {
+ for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
+ .values()) {
+ for (Map<Resource, ResourceRequest> capabalities : rr.values()) {
+ for (ResourceRequest request : capabalities.values()) {
+ addResourceRequestToAsk(request);
+ }
+ }
+ }
+ if (!ignoreBlacklisting.get()) {
+ blacklistAdditions.addAll(blacklistedNodes);
+ }
+ if (!pendingRelease.isEmpty()) {
+ release.addAll(pendingRelease);
+ }
+ }
+
// May be incorrect if there's multiple NodeManagers running on a single host.
// knownNodeCount is based on node managers, not hosts. blacklisting is
// currently based on hosts.
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1611436&r1=1611435&r2=1611436&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Thu Jul 17 19:01:27 2014
@@ -77,6 +77,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
@@ -94,9 +96,13 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -617,6 +623,10 @@ public class TestRMContainerAllocator {
super(conf);
}
+ public MyResourceManager(Configuration conf, RMStateStore store) {
+ super(conf, store);
+ }
+
@Override
public void serviceStart() throws Exception {
super.serviceStart();
@@ -1425,6 +1435,13 @@ public class TestRMContainerAllocator {
rm.getMyFifoScheduler().lastBlacklistRemovals.size());
}
+ private static void assertAsksAndReleases(int expectedAsk,
+ int expectedRelease, MyResourceManager rm) {
+ Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size());
+ Assert.assertEquals(expectedRelease,
+ rm.getMyFifoScheduler().lastRelease.size());
+ }
+
private static class MyFifoScheduler extends FifoScheduler {
public MyFifoScheduler(RMContext rmContext) {
@@ -1439,6 +1456,7 @@ public class TestRMContainerAllocator {
}
List<ResourceRequest> lastAsk = null;
+ List<ContainerId> lastRelease = null;
List<String> lastBlacklistAdditions;
List<String> lastBlacklistRemovals;
@@ -1457,6 +1475,7 @@ public class TestRMContainerAllocator {
askCopy.add(reqCopy);
}
lastAsk = ask;
+ lastRelease = release;
lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals;
return super.allocate(
@@ -1504,6 +1523,20 @@ public class TestRMContainerAllocator {
return new ContainerFailedEvent(attemptId, host);
}
+ private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
+ int taskAttemptId, boolean reduce) {
+ TaskId taskId;
+ if (reduce) {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+ } else {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ }
+ TaskAttemptId attemptId =
+ MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId);
+ return new ContainerAllocatorEvent(attemptId,
+ ContainerAllocator.EventType.CONTAINER_DEALLOCATE);
+ }
+
private void checkAssignments(ContainerRequestEvent[] requests,
List<TaskAttemptContainerAssignedEvent> assignments,
boolean checkHostMatch) {
@@ -1556,6 +1589,7 @@ public class TestRMContainerAllocator {
= new ArrayList<JobUpdatedNodesEvent>();
private MyResourceManager rm;
private boolean isUnregistered = false;
+ private AllocateResponse allocateResponse;
private static AppContext createAppContext(
ApplicationAttemptId appAttemptId, Job job) {
AppContext context = mock(AppContext.class);
@@ -1665,6 +1699,10 @@ public class TestRMContainerAllocator {
super.handleEvent(f);
}
+ public void sendDeallocate(ContainerAllocatorEvent f) {
+ super.handleEvent(f);
+ }
+
// API to be used by tests
public List<TaskAttemptContainerAssignedEvent> schedule()
throws Exception {
@@ -1710,6 +1748,20 @@ public class TestRMContainerAllocator {
public boolean isUnregistered() {
return isUnregistered;
}
+
+ public void updateSchedulerProxy(MyResourceManager rm) {
+ scheduler = rm.getApplicationMasterService();
+ }
+
+ @Override
+ protected AllocateResponse makeRemoteRequest() throws IOException {
+ allocateResponse = super.makeRemoteRequest();
+ return allocateResponse;
+ }
+
+ public boolean isResyncCommand() {
+ return super.isResyncCommand(allocateResponse);
+ }
}
@Test
@@ -2017,6 +2069,198 @@ public class TestRMContainerAllocator {
Assert.assertTrue(allocator.isUnregistered());
}
+ // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+ // blackListeNode
+ // Step-2 : 2 containers are allocated by RM.
+ // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+ // RM
+ // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+ // additional containerRequest(event4) and blacklisted nodes.
+ // Intern RM send resync command
+ // Step-5 : On Resync,AM sends all outstanding
+ // asks,release,blacklistAaddition
+ // and another containerRequest(event5)
+ // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+ @Test
+ public void testRMContainerAllocatorResendsRequestsOnRMRestart()
+ throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+ conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+ conf.setInt(
+ MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+ rm1.start();
+ DrainDispatcher dispatcher =
+ (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+ // Submit the application
+ RMApp app = rm1.submitApp(1024);
+ dispatcher.await();
+
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId =
+ app.getCurrentAppAttempt().getAppAttemptId();
+ rm1.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ MyContainerAllocator allocator =
+ new MyContainerAllocator(rm1, conf, appAttemptId, mockJob);
+
+ // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+ // blackListeNode
+ // create the container request
+ // send MAP request
+ ContainerRequestEvent event1 =
+ createReq(jobId, 1, 1024, new String[] { "h1" });
+ allocator.sendRequest(event1);
+
+ ContainerRequestEvent event2 =
+ createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
+ allocator.sendRequest(event2);
+
+ // Send events to blacklist h2
+ ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false);
+ allocator.sendFailure(f1);
+
+ // send allocate request and 1 blacklisted nodes
+ List<TaskAttemptContainerAssignedEvent> assignedContainers =
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0,
+ assignedContainers.size());
+ // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
+ assertAsksAndReleases(3, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(1, 0, rm1);
+
+ nm1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ // Step-2 : 2 containers are allocated by RM.
+ assignedContainers = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 2", 2,
+ assignedContainers.size());
+ assertAsksAndReleases(0, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+ assignedContainers = allocator.schedule();
+ Assert.assertEquals("No of assignments must be 0", 0,
+ assignedContainers.size());
+ assertAsksAndReleases(3, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+ // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+ // RM
+ // send container request
+ ContainerRequestEvent event3 =
+ createReq(jobId, 3, 1000, new String[] { "h1" });
+ allocator.sendRequest(event3);
+
+ // send deallocate request
+ ContainerAllocatorEvent deallocate1 =
+ createDeallocateEvent(jobId, 1, false);
+ allocator.sendDeallocate(deallocate1);
+
+ assignedContainers = allocator.schedule();
+ Assert.assertEquals("No of assignments must be 0", 0,
+ assignedContainers.size());
+ assertAsksAndReleases(3, 1, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+ // Phase-2 start 2nd RM is up
+ MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ allocator.updateSchedulerProxy(rm2);
+ dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+ // NM should be rebooted on heartbeat, even first heartbeat for nm2
+ NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+ // new NM to represent NM re-register
+ nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+ // additional containerRequest(event4) and blacklisted nodes.
+ // Intern RM send resync command
+
+ // send deallocate request, release=1
+ ContainerAllocatorEvent deallocate2 =
+ createDeallocateEvent(jobId, 2, false);
+ allocator.sendDeallocate(deallocate2);
+
+ // Send events to blacklist nodes h3
+ ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false);
+ allocator.sendFailure(f2);
+
+ ContainerRequestEvent event4 =
+ createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
+ allocator.sendRequest(event4);
+
+ // send allocate request to 2nd RM and get resync command
+ allocator.schedule();
+ dispatcher.await();
+ Assert.assertTrue("Last allocate response is not RESYNC",
+ allocator.isResyncCommand());
+
+ // Step-5 : On Resync,AM sends all outstanding
+ // asks,release,blacklistAaddition
+ // and another containerRequest(event5)
+ ContainerRequestEvent event5 =
+ createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
+ allocator.sendRequest(event5);
+
+ // send all outstanding request again.
+ assignedContainers = allocator.schedule();
+ dispatcher.await();
+ assertAsksAndReleases(3, 2, rm2);
+ assertBlacklistAdditionsAndRemovals(2, 0, rm2);
+
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+ assignedContainers = allocator.schedule();
+ dispatcher.await();
+
+ Assert.assertEquals("Number of container should be 3", 3,
+ assignedContainers.size());
+
+ for (TaskAttemptContainerAssignedEvent assig : assignedContainers) {
+ Assert.assertTrue("Assigned count not correct",
+ "h1".equals(assig.getContainer().getNodeId().getHost()));
+ }
+
+ rm1.stop();
+ rm2.stop();
+
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();