You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/01/06 22:42:36 UTC

hadoop git commit: YARN-2902. Killing a container that is localizing can orphan resources in the DOWNLOADING state. Contributed by Varun Saxena.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 74027c24c -> 9da7b1fdd


YARN-2902. Killing a container that is localizing can orphan resources in the DOWNLOADING state. Contributed by Varun Saxena.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9da7b1fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9da7b1fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9da7b1fd

Branch: refs/heads/branch-2.6
Commit: 9da7b1fdd2b88399d2b2e11bc7dce7d80b41e297
Parents: 74027c2
Author: Junping Du <ju...@apache.org>
Authored: Wed Jan 6 13:50:35 2016 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Wed Jan 6 13:50:35 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../nodemanager/DefaultContainerExecutor.java   |   9 +-
 .../nodemanager/DockerContainerExecutor.java    |   9 +-
 .../localizer/LocalResourcesTrackerImpl.java    |  10 +
 .../localizer/ResourceLocalizationService.java  |  13 +
 .../TestLocalResourcesTrackerImpl.java          |   6 +-
 .../TestResourceLocalizationService.java        | 298 ++++++++++++++++++-
 7 files changed, 335 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9da7b1fd/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6675dce..0cf59cb 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -31,6 +31,9 @@ Release 2.6.4 - UNRELEASED
     YARN-4546. ResourceManager crash due to scheduling opportunity overflow.
     (Jason Lowe via junping_du)
 
+    YARN-2902. Killing a container that is localizing can orphan resources in
+    the DOWNLOADING state (Varun Saxena via jlowe)
+
 Release 2.6.3 - 2015-12-17
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9da7b1fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 5bf8cec..cd20449 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
@@ -464,8 +465,12 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     for (Path baseDir : baseDirs) {
       Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
       LOG.info("Deleting path : " + del);
-      if (!lfs.delete(del, true)) {
-        LOG.warn("delete returned false for path: [" + del + "]");
+      try {
+        if (!lfs.delete(del, true)) {
+          LOG.warn("delete returned false for path: [" + del + "]");
+        }
+      } catch (FileNotFoundException e) {
+        continue;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9da7b1fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
index d8dd890..7c1947b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -407,8 +408,12 @@ public class DockerContainerExecutor extends ContainerExecutor {
     for (Path baseDir : baseDirs) {
       Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
       LOG.info("Deleting path : " + del);
-      if (!lfs.delete(del, true)) {
-        LOG.warn("delete returned false for path: [" + del + "]");
+      try {
+        if (!lfs.delete(del, true)) {
+          LOG.warn("delete returned false for path: [" + del + "]");
+        }
+      } catch (FileNotFoundException e) {
+        continue;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9da7b1fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
index 8f209e7..f9346d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
@@ -173,6 +173,16 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
 
     rsrc.handle(event);
 
+    // Remove the resource if its downloading and its reference count has
+    // become 0 after RELEASE. This maybe because a container was killed while
+    // localizing and no other container is referring to the resource.
+    if (event.getType() == ResourceEventType.RELEASE) {
+      if (rsrc.getState() == ResourceState.DOWNLOADING &&
+          rsrc.getRefCount() <= 0) {
+        removeResource(req);
+      }
+    }
+
     if (event.getType() == ResourceEventType.LOCALIZED) {
       if (rsrc.getLocalPath() != null) {
         try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9da7b1fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 68f5850..cd3cc0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -1122,9 +1122,22 @@ public class ResourceLocalizationService extends CompositeService
         dispatcher.getEventHandler().handle(
             new ContainerResourceFailedEvent(cId, null, e.getMessage()));
       } finally {
+        List<Path> paths = new ArrayList<Path>();
         for (LocalizerResourceRequestEvent event : scheduled.values()) {
+          // This means some resources were in downloading state. Schedule
+          // deletion task for localization dir and tmp dir used for downloading
+          Path locRsrcPath = event.getResource().getLocalPath();
+          if (locRsrcPath != null) {
+            Path locRsrcDirPath = locRsrcPath.getParent();
+            paths.add(locRsrcDirPath);
+            paths.add(new Path(locRsrcDirPath + "_tmp"));
+          }
           event.getResource().unlock();
         }
+        if (!paths.isEmpty()) {
+          delService.delete(context.getUser(),
+              null, paths.toArray(new Path[paths.size()]));
+        }
         delService.delete(null, nmPrivateCTokensPath, new Path[] {});
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9da7b1fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
index 6ab2c8b..e59a796 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
@@ -137,12 +137,12 @@ public class TestLocalResourcesTrackerImpl {
       tracker.handle(rel21Event);
 
       dispatcher.await();
-      verifyTrackedResourceCount(tracker, 2);
+      verifyTrackedResourceCount(tracker, 1);
 
       // Verify resource with non zero ref count is not removed.
       Assert.assertEquals(2, lr1.getRefCount());
       Assert.assertFalse(tracker.remove(lr1, mockDelService));
-      verifyTrackedResourceCount(tracker, 2);
+      verifyTrackedResourceCount(tracker, 1);
 
       // Localize resource1
       ResourceLocalizedEvent rle =
@@ -157,7 +157,7 @@ public class TestLocalResourcesTrackerImpl {
 
       // Verify resources in state LOCALIZED with ref-count=0 is removed.
       Assert.assertTrue(tracker.remove(lr1, mockDelService));
-      verifyTrackedResourceCount(tracker, 1);
+      verifyTrackedResourceCount(tracker, 0);
     } finally {
       if (dispatcher != null) {
         dispatcher.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9da7b1fd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index ee6ba17..fdf9981 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -42,6 +42,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -95,6 +97,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -145,9 +148,12 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
+import org.mockito.internal.matchers.VarargMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.Sets;
+
 public class TestResourceLocalizationService {
 
   static final Path basedir =
@@ -472,16 +478,14 @@ public class TestResourceLocalizationService {
         Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
         pubRsrcs.remove(lr.getRequest());
       }
-      Assert.assertEquals(0, pubRsrcs.size());
-      Assert.assertEquals(2, pubRsrcCount);
+      Assert.assertEquals(2, pubRsrcs.size());
+      Assert.assertEquals(0, pubRsrcCount);
 
       appRsrcCount = 0;
       for (LocalizedResource lr : appTracker) {
         appRsrcCount++;
-        Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
-        Assert.assertEquals(appReq, lr.getRequest());
       }
-      Assert.assertEquals(1, appRsrcCount);
+      Assert.assertEquals(0, appRsrcCount);
     } finally {
       dispatcher.stop();
       delService.stop();
@@ -1067,7 +1071,289 @@ public class TestResourceLocalizationService {
       dispatcher.stop();
     }
   }
-  
+
+  private static class DownloadingPathsMatcher extends ArgumentMatcher<Path[]>
+      implements VarargMatcher {
+    static final long serialVersionUID = 0;
+
+    private transient Set<Path> matchPaths;
+
+    DownloadingPathsMatcher(Set<Path> matchPaths) {
+      this.matchPaths = matchPaths;
+    }
+
+    @Override
+    public boolean matches(Object varargs) {
+      Path[] downloadingPaths = (Path[]) varargs;
+      if (matchPaths.size() != downloadingPaths.length) {
+        return false;
+      }
+      for (Path downloadingPath : downloadingPaths) {
+        if (!matchPaths.contains(downloadingPath)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private void readObject(ObjectInputStream os)
+        throws NotSerializableException {
+      throw new NotSerializableException(this.getClass().getName());
+    }
+  }
+
+  private static class DummyExecutor extends DefaultContainerExecutor {
+    private volatile boolean stopLocalization = false;
+    @Override
+    public void startLocalizer(Path nmPrivateContainerTokensPath,
+        InetSocketAddress nmAddr, String user, String appId, String locId,
+        LocalDirsHandlerService dirsHandler)
+        throws IOException, InterruptedException {
+      while (!stopLocalization) {
+        Thread.yield();
+      }
+    }
+    void setStopLocalization() {
+      stopLocalization = true;
+    }
+  }
+
+  @Test(timeout = 20000)
+  @SuppressWarnings("unchecked")
+  public void testDownloadingResourcesOnContainerKill() throws Exception {
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[1];
+    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+    sDirs[0] = localDirs.get(0).toString();
+
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+
+    DummyExecutor exec = new DummyExecutor();
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+
+    DeletionService delServiceReal = new DeletionService(exec);
+    DeletionService delService = spy(delServiceReal);
+    delService.init(new Configuration());
+    delService.start();
+
+    ResourceLocalizationService rawService = new ResourceLocalizationService(
+        dispatcher, exec, delService, dirsHandler, nmContext);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPermission =
+        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+    final Path userDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ContainerLocalizer.USERCACHE);
+    final Path fileDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ContainerLocalizer.FILECACHE);
+    final Path sysDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ResourceLocalizationService.NM_PRIVATE_DIR);
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "", new Path(sDirs[0]));
+    final FileStatus nmFs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          nmPermission, "", "", sysDir);
+
+    doAnswer(new Answer<FileStatus>() {
+      @Override
+      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        if (args.length > 0) {
+          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
+            return fs;
+          }
+        }
+        return nmFs;
+      }
+    }).when(spylfs).getFileStatus(isA(Path.class));
+
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      final Application app = mock(Application.class);
+      final ApplicationId appId =
+          BuilderUtils.newApplicationId(314159265358979L, 3);
+      String user = "user0";
+      when(app.getUser()).thenReturn(user);
+      when(app.getAppId()).thenReturn(appId);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      ArgumentMatcher<ApplicationEvent> matchesAppInit =
+        new ArgumentMatcher<ApplicationEvent>() {
+          @Override
+          public boolean matches(Object o) {
+            ApplicationEvent evt = (ApplicationEvent) o;
+            return evt.getType() == ApplicationEventType.APPLICATION_INITED
+              && appId == evt.getApplicationID();
+          }
+        };
+      dispatcher.await();
+      verify(applicationBus).handle(argThat(matchesAppInit));
+
+      // Initialize localizer.
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+      final Container c1 = getMockContainer(appId, 42, "user0");
+      final Container c2 = getMockContainer(appId, 43, "user0");
+      FSDataOutputStream out =
+        new FSDataOutputStream(new DataOutputBuffer(), null);
+      doReturn(out).when(spylfs).createInternal(isA(Path.class),
+          isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
+          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
+          anyBoolean());
+      final LocalResource resource1 = getPrivateMockedResource(r);
+      LocalResource resource2 = null;
+      do {
+        resource2 = getPrivateMockedResource(r);
+      } while (resource2 == null || resource2.equals(resource1));
+      LocalResource resource3 = null;
+      do {
+        resource3 = getPrivateMockedResource(r);
+      } while (resource3 == null || resource3.equals(resource1)
+          || resource3.equals(resource2));
+
+      // Send localization requests for container c1 and c2.
+      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+      final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+      final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+          new HashMap<LocalResourceVisibility,
+              Collection<LocalResourceRequest>>();
+      List<LocalResourceRequest> privateResourceList =
+          new ArrayList<LocalResourceRequest>();
+      privateResourceList.add(req1);
+      privateResourceList.add(req2);
+      privateResourceList.add(req3);
+      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+      spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
+
+      final LocalResourceRequest req1_1 = new LocalResourceRequest(resource2);
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs1 =
+          new HashMap<LocalResourceVisibility,
+              Collection<LocalResourceRequest>>();
+      List<LocalResourceRequest> privateResourceList1 =
+          new ArrayList<LocalResourceRequest>();
+      privateResourceList1.add(req1_1);
+      rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1);
+      spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
+
+      dispatcher.await();
+      final String containerIdStr = c1.getContainerId().toString();
+      // Heartbeats from container localizer
+      LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
+      LocalizerStatus stat = mock(LocalizerStatus.class);
+      when(stat.getLocalizerId()).thenReturn(containerIdStr);
+      when(rsrc1success.getResource()).thenReturn(resource1);
+      when(rsrc2pending.getResource()).thenReturn(resource2);
+      when(rsrc1success.getLocalSize()).thenReturn(4344L);
+      URL locPath = getPath("/some/path");
+      when(rsrc1success.getLocalPath()).thenReturn(locPath);
+      when(rsrc1success.getStatus()).
+          thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrc2pending.getStatus()).
+          thenReturn(ResourceStatusType.FETCH_PENDING);
+
+      when(stat.getResources())
+          .thenReturn(Collections.<LocalResourceStatus>emptyList())
+          .thenReturn(Collections.singletonList(rsrc1success))
+          .thenReturn(Collections.singletonList(rsrc2pending))
+          .thenReturn(Collections.singletonList(rsrc2pending))
+          .thenReturn(Collections.<LocalResourceStatus>emptyList());
+
+      // First heartbeat which schedules first resource.
+      LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+      // Second heartbeat which reports first resource as success.
+      // Second resource is scheduled.
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+      final String locPath1 = response.getResourceSpecs().get(0).
+          getDestinationDirectory().getFile();
+
+      // Third heartbeat which reports second resource as pending.
+      // Third resource is scheduled.
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+      final String locPath2 = response.getResourceSpecs().get(0).
+          getDestinationDirectory().getFile();
+
+      // Container c1 is killed which leads to cleanup
+      spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
+
+      // This heartbeat will indicate to container localizer to die as localizer
+      // runner has stopped.
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+      exec.setStopLocalization();
+      dispatcher.await();
+      // verify container notification
+      ArgumentMatcher<ContainerEvent> successContainerLoc =
+          new ArgumentMatcher<ContainerEvent>() {
+          @Override
+          public boolean matches(Object o) {
+            ContainerEvent evt = (ContainerEvent) o;
+            return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
+              && c1.getContainerId() == evt.getContainerID();
+          }
+        };
+      // Only one resource gets localized for container c1.
+      verify(containerBus).handle(argThat(successContainerLoc));
+
+      Set<Path> paths =
+          Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
+              new Path(locPath2), new Path(locPath2 + "_tmp"));
+      // Verify if downloading resources were submitted for deletion.
+      verify(delService).delete(eq(user),
+          (Path) eq(null), argThat(new DownloadingPathsMatcher(paths)));
+
+      LocalResourcesTracker tracker = spyService.getLocalResourcesTracker(
+          LocalResourceVisibility.PRIVATE, "user0", appId);
+      // Container c1 was killed but this resource was localized before kill
+      // hence its not removed despite ref cnt being 0.
+      LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
+      assertNotNull(rsrc1);
+      assertEquals(rsrc1.getState(), ResourceState.LOCALIZED);
+      assertEquals(rsrc1.getRefCount(), 0);
+
+      // Container c1 was killed but this resource is referenced by container c2
+      // as well hence its ref cnt is 1.
+      LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
+      assertNotNull(rsrc2);
+      assertEquals(rsrc2.getState(), ResourceState.DOWNLOADING);
+      assertEquals(rsrc2.getRefCount(), 1);
+
+      // As container c1 was killed and this resource was not referenced by any
+      // other container, hence its removed.
+      LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
+      assertNull(rsrc3);
+    } finally {
+      spyService.stop();
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
+
   /*
    * Test case for handling RejectedExecutionException and IOException which can
    * be thrown when adding public resources to the pending queue.