You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ji...@apache.org on 2014/07/17 20:46:21 UTC

svn commit: r1611434 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/or...

Author: jianhe
Date: Thu Jul 17 18:46:20 2014
New Revision: 1611434

URL: http://svn.apache.org/r1611434
Log:
MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving RM-restart. Contributed by Rohith

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1611434&r1=1611433&r2=1611434&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jul 17 18:46:20 2014
@@ -17,6 +17,9 @@ Trunk (Unreleased)
     MAPREDUCE-5232. Add a configuration to be able to log classpath and other
     system properties on mapreduce JVMs startup.  (Sangjin Lee via vinodkv)
 
+    MAPREDUCE-5910. Make MR AM resync with RM in case of work-preserving
+    RM-restart. (Rohith via jianhe)
+
   IMPROVEMENTS
 
     MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1611434&r1=1611433&r2=1611434&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Thu Jul 17 18:46:20 2014
@@ -64,6 +64,7 @@ public class LocalContainerAllocator ext
   private int nmPort;
   private int nmHttpPort;
   private ContainerId containerId;
+  protected int lastResponseID;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -119,6 +120,11 @@ public class LocalContainerAllocator ext
     if (allocateResponse.getAMCommand() != null) {
       switch(allocateResponse.getAMCommand()) {
       case AM_RESYNC:
+        LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+            + " hence resyncing.");        
+        this.lastResponseID = 0;
+        register();
+        break;
       case AM_SHUTDOWN:
         LOG.info("Event from RM: shutting down Application Master");
         // This can happen if the RM has been restarted. If it is in that state,

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1611434&r1=1611433&r2=1611434&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Thu Jul 17 18:46:20 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -216,20 +217,27 @@ public abstract class RMCommunicator ext
     FinishApplicationMasterRequest request =
         FinishApplicationMasterRequest.newInstance(finishState,
           sb.toString(), historyUrl);
-    while (true) {
-      FinishApplicationMasterResponse response =
-          scheduler.finishApplicationMaster(request);
-      if (response.getIsUnregistered()) {
-        // When excepting ClientService, other services are already stopped,
-        // it is safe to let clients know the final states. ClientService
-        // should wait for some time so clients have enough time to know the
-        // final states.
-        RunningAppContext raContext = (RunningAppContext) context;
-        raContext.markSuccessfulUnregistration();
-        break;
+    try {
+      while (true) {
+        FinishApplicationMasterResponse response =
+            scheduler.finishApplicationMaster(request);
+        if (response.getIsUnregistered()) {
+          // When excepting ClientService, other services are already stopped,
+          // it is safe to let clients know the final states. ClientService
+          // should wait for some time so clients have enough time to know the
+          // final states.
+          RunningAppContext raContext = (RunningAppContext) context;
+          raContext.markSuccessfulUnregistration();
+          break;
+        }
+        LOG.info("Waiting for application to be successfully unregistered.");
+        Thread.sleep(rmPollInterval);
       }
-      LOG.info("Waiting for application to be successfully unregistered.");
-      Thread.sleep(rmPollInterval);
+    } catch (ApplicationMasterNotRegisteredException e) {
+      // RM might have restarted or failed over and so lost the fact that AM had
+      // registered before.
+      register();
+      doUnregistration();
     }
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1611434&r1=1611433&r2=1611434&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jul 17 18:46:20 2014
@@ -389,6 +389,7 @@ public class RMContainerAllocator extend
           removed = true;
           assignedRequests.remove(aId);
           containersReleased++;
+          pendingRelease.add(containerId);
           release(containerId);
         }
       }
@@ -641,6 +642,15 @@ public class RMContainerAllocator extend
     if (response.getAMCommand() != null) {
       switch(response.getAMCommand()) {
       case AM_RESYNC:
+          LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+              + " hence resyncing.");
+          lastResponseID = 0;
+
+          // Registering to allow RM to discover an active AM for this
+          // application
+          register();
+          addOutstandingRequestOnResync();
+          break;
       case AM_SHUTDOWN:
         // This can happen if the RM has been restarted. If it is in that state,
         // this application must clean itself up.
@@ -700,6 +710,7 @@ public class RMContainerAllocator extend
         LOG.error("Container complete event for unknown container id "
             + cont.getContainerId());
       } else {
+        pendingRelease.remove(cont.getContainerId());
         assignedRequests.remove(attemptID);
         
         // send the container completed event to Task attempt
@@ -991,6 +1002,7 @@ public class RMContainerAllocator extend
     
     private void containerNotAssigned(Container allocated) {
       containersReleased++;
+      pendingRelease.add(allocated.getId());
       release(allocated.getId());      
     }
     

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1611434&r1=1611433&r2=1611434&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu Jul 17 18:46:20 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,7 +59,7 @@ public abstract class RMContainerRequest
   
   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
 
-  private int lastResponseID;
+  protected int lastResponseID;
   private Resource availableResources;
 
   private final RecordFactory recordFactory =
@@ -77,8 +78,11 @@ public abstract class RMContainerRequest
   // numContainers dont end up as duplicates
   private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
-  private final Set<ContainerId> release = new TreeSet<ContainerId>(); 
-
+  private final Set<ContainerId> release = new TreeSet<ContainerId>();
+  // pendingRelease holds history or release requests.request is removed only if
+  // RM sends completedContainer.
+  // How it different from release? --> release is for per allocate() request.
+  protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
   private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@@ -186,6 +190,10 @@ public abstract class RMContainerRequest
     } catch (YarnException e) {
       throw new IOException(e);
     }
+
+    if (isResyncCommand(allocateResponse)) {
+      return allocateResponse;
+    }
     lastResponseID = allocateResponse.getResponseId();
     availableResources = allocateResponse.getAvailableResources();
     lastClusterNmCount = clusterNmCount;
@@ -214,6 +222,28 @@ public abstract class RMContainerRequest
     return allocateResponse;
   }
 
+  protected boolean isResyncCommand(AllocateResponse allocateResponse) {
+    return allocateResponse.getAMCommand() != null
+        && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+  }
+
+  protected void addOutstandingRequestOnResync() {
+    for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
+        .values()) {
+      for (Map<Resource, ResourceRequest> capabalities : rr.values()) {
+        for (ResourceRequest request : capabalities.values()) {
+          addResourceRequestToAsk(request);
+        }
+      }
+    }
+    if (!ignoreBlacklisting.get()) {
+      blacklistAdditions.addAll(blacklistedNodes);
+    }
+    if (!pendingRelease.isEmpty()) {
+      release.addAll(pendingRelease);
+    }
+  }
+
   // May be incorrect if there's multiple NodeManagers running on a single host.
   // knownNodeCount is based on node managers, not hosts. blacklisting is
   // currently based on hosts.

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1611434&r1=1611433&r2=1611434&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Thu Jul 17 18:46:20 2014
@@ -78,6 +78,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -95,9 +97,13 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -618,6 +624,10 @@ public class TestRMContainerAllocator {
       super(conf);
     }
 
+    public MyResourceManager(Configuration conf, RMStateStore store) {
+      super(conf, store);
+    }
+
     @Override
     public void serviceStart() throws Exception {
       super.serviceStart();
@@ -1426,6 +1436,13 @@ public class TestRMContainerAllocator {
         rm.getMyFifoScheduler().lastBlacklistRemovals.size());
   }
 
+  private static void assertAsksAndReleases(int expectedAsk,
+      int expectedRelease, MyResourceManager rm) {
+    Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size());
+    Assert.assertEquals(expectedRelease,
+        rm.getMyFifoScheduler().lastRelease.size());
+  }
+
   private static class MyFifoScheduler extends FifoScheduler {
 
     public MyFifoScheduler(RMContext rmContext) {
@@ -1440,6 +1457,7 @@ public class TestRMContainerAllocator {
     }
     
     List<ResourceRequest> lastAsk = null;
+    List<ContainerId> lastRelease = null;
     List<String> lastBlacklistAdditions;
     List<String> lastBlacklistRemovals;
     
@@ -1458,6 +1476,7 @@ public class TestRMContainerAllocator {
         askCopy.add(reqCopy);
       }
       lastAsk = ask;
+      lastRelease = release;
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
       return super.allocate(
@@ -1505,6 +1524,20 @@ public class TestRMContainerAllocator {
     return new ContainerFailedEvent(attemptId, host);    
   }
   
+  private ContainerAllocatorEvent createDeallocateEvent(JobId jobId,
+      int taskAttemptId, boolean reduce) {
+    TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    TaskAttemptId attemptId =
+        MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId);
+    return new ContainerAllocatorEvent(attemptId,
+        ContainerAllocator.EventType.CONTAINER_DEALLOCATE);
+  }
+
   private void checkAssignments(ContainerRequestEvent[] requests,
       List<TaskAttemptContainerAssignedEvent> assignments,
       boolean checkHostMatch) {
@@ -1557,6 +1590,7 @@ public class TestRMContainerAllocator {
     = new ArrayList<JobUpdatedNodesEvent>();
     private MyResourceManager rm;
     private boolean isUnregistered = false;
+    private AllocateResponse allocateResponse;
     private static AppContext createAppContext(
         ApplicationAttemptId appAttemptId, Job job) {
       AppContext context = mock(AppContext.class);
@@ -1668,6 +1702,10 @@ public class TestRMContainerAllocator {
       super.handleEvent(f);
     }
     
+    public void sendDeallocate(ContainerAllocatorEvent f) {
+      super.handleEvent(f);
+    }
+
     // API to be used by tests
     public List<TaskAttemptContainerAssignedEvent> schedule()
         throws Exception {
@@ -1713,6 +1751,20 @@ public class TestRMContainerAllocator {
     public boolean isUnregistered() {
       return isUnregistered;
     }
+
+    public void updateSchedulerProxy(MyResourceManager rm) {
+      scheduler = rm.getApplicationMasterService();
+    }
+
+    @Override
+    protected AllocateResponse makeRemoteRequest() throws IOException {
+      allocateResponse = super.makeRemoteRequest();
+      return allocateResponse;
+    }
+
+    public boolean isResyncCommand() {
+      return super.isResyncCommand(allocateResponse);
+    }
   }
 
   @Test
@@ -2022,6 +2074,198 @@ public class TestRMContainerAllocator {
     Assert.assertTrue(allocator.isUnregistered());
   }
   
+  // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+  // blackListeNode
+  // Step-2 : 2 containers are allocated by RM.
+  // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+  // RM
+  // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+  // additional containerRequest(event4) and blacklisted nodes.
+  // Intern RM send resync command
+  // Step-5 : On Resync,AM sends all outstanding
+  // asks,release,blacklistAaddition
+  // and another containerRequest(event5)
+  // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+  @Test
+  public void testRMContainerAllocatorResendsRequestsOnRMRestart()
+      throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+    rm1.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm1.submitApp(1024);
+    dispatcher.await();
+
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    rm1.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator =
+        new MyContainerAllocator(rm1, conf, appAttemptId, mockJob);
+
+    // Step-1 : AM send allocate request for 2 ContainerRequests and 1
+    // blackListeNode
+    // create the container request
+    // send MAP request
+    ContainerRequestEvent event1 =
+        createReq(jobId, 1, 1024, new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    ContainerRequestEvent event2 =
+        createReq(jobId, 2, 2048, new String[] { "h1", "h2" });
+    allocator.sendRequest(event2);
+
+    // Send events to blacklist h2
+    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h2", false);
+    allocator.sendFailure(f1);
+
+    // send allocate request and 1 blacklisted nodes
+    List<TaskAttemptContainerAssignedEvent> assignedContainers =
+        allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed
+    assertAsksAndReleases(3, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(1, 0, rm1);
+
+    nm1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    // Step-2 : 2 containers are allocated by RM.
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 2", 2,
+        assignedContainers.size());
+    assertAsksAndReleases(0, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    assignedContainers = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    assertAsksAndReleases(3, 0, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    // Step-3 : AM Send 1 containerRequest(event3) and 1 releaseRequests to
+    // RM
+    // send container request
+    ContainerRequestEvent event3 =
+        createReq(jobId, 3, 1000, new String[] { "h1" });
+    allocator.sendRequest(event3);
+
+    // send deallocate request
+    ContainerAllocatorEvent deallocate1 =
+        createDeallocateEvent(jobId, 1, false);
+    allocator.sendDeallocate(deallocate1);
+
+    assignedContainers = allocator.schedule();
+    Assert.assertEquals("No of assignments must be 0", 0,
+        assignedContainers.size());
+    assertAsksAndReleases(3, 1, rm1);
+    assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+    // Phase-2 start 2nd RM is up
+    MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    allocator.updateSchedulerProxy(rm2);
+    dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+    // NM should be rebooted on heartbeat, even first heartbeat for nm2
+    NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+    // new NM to represent NM re-register
+    nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+    nm1.registerNode();
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+    // additional containerRequest(event4) and blacklisted nodes.
+    // Intern RM send resync command
+
+    // send deallocate request, release=1
+    ContainerAllocatorEvent deallocate2 =
+        createDeallocateEvent(jobId, 2, false);
+    allocator.sendDeallocate(deallocate2);
+
+    // Send events to blacklist nodes h3
+    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h3", false);
+    allocator.sendFailure(f2);
+
+    ContainerRequestEvent event4 =
+        createReq(jobId, 4, 2000, new String[] { "h1", "h2" });
+    allocator.sendRequest(event4);
+
+    // send allocate request to 2nd RM and get resync command
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertTrue("Last allocate response is not RESYNC",
+        allocator.isResyncCommand());
+
+    // Step-5 : On Resync,AM sends all outstanding
+    // asks,release,blacklistAaddition
+    // and another containerRequest(event5)
+    ContainerRequestEvent event5 =
+        createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" });
+    allocator.sendRequest(event5);
+
+    // send all outstanding request again.
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+    assertAsksAndReleases(3, 2, rm2);
+    assertBlacklistAdditionsAndRemovals(2, 0, rm2);
+
+    nm1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Step-6 : RM allocates containers i.e event3,event4 and cRequest5
+    assignedContainers = allocator.schedule();
+    dispatcher.await();
+
+    Assert.assertEquals("Number of container should be 3", 3,
+        assignedContainers.size());
+
+    for (TaskAttemptContainerAssignedEvent assig : assignedContainers) {
+      Assert.assertTrue("Assigned count not correct",
+          "h1".equals(assig.getContainer().getNodeId().getHost()));
+    }
+
+    rm1.stop();
+    rm2.stop();
+
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();