You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/20 00:35:43 UTC

svn commit: r1470076 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ hadoop-yarn/hadoop-yarn-server/hado...

Author: vinodkv
Date: Fri Apr 19 22:35:43 2013
New Revision: 1470076

URL: http://svn.apache.org/r1470076
Log:
YARN-547. Fixed race conditions in public and private resource localization which used to cause duplicate downloads. Contributed by Omkar Vinit Joshi.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Apr 19 22:35:43 2013
@@ -269,6 +269,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-585. Fix failure in TestFairScheduler#testNotAllowSubmitApplication
     caused by YARN-514. (Zhijie Shen via vinodkv)
 
+    YARN-547. Fixed race conditions in public and private resource localization
+    which used to cause duplicate downloads. (Omkar Vinit Joshi via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Fri Apr 19 22:35:43 2013
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Component tracking resources all of the same {@link LocalResourceVisibility}
  * 
@@ -41,4 +44,8 @@ interface LocalResourcesTracker
   String getUser();
 
   long nextUniqueNumber();
+  
+  @VisibleForTesting
+  @Private
+  LocalizedResource getLocalizedResource(LocalResourceRequest request);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Apr 19 22:35:43 2013
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /**
  * A collection of {@link LocalizedResource}s all of same
@@ -307,4 +310,11 @@ class LocalResourcesTrackerImpl implemen
   public long nextUniqueNumber() {
     return uniqueNumberGenerator.incrementAndGet();
   }
+
+  @VisibleForTesting
+  @Private
+  @Override
+  public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
+    return localrsrc.get(request);
+  }
 }
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Fri Apr 19 22:35:43 2013
@@ -78,19 +78,14 @@ public class LocalizedResource implement
     // From INIT (ref == 0, awaiting req)
     .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
         ResourceEventType.REQUEST, new FetchResourceTransition())
-    .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
-        ResourceEventType.LOCALIZED, new FetchDirectTransition())
-    .addTransition(ResourceState.INIT, ResourceState.INIT,
-        ResourceEventType.RELEASE, new ReleaseTransition())
 
     // From DOWNLOADING (ref > 0, may be localizing)
     .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
         ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!!
     .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED,
         ResourceEventType.LOCALIZED, new FetchSuccessTransition())
-    .addTransition(ResourceState.DOWNLOADING,
-        EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
-        ResourceEventType.RELEASE, new ReleasePendingTransition())
+    .addTransition(ResourceState.DOWNLOADING,ResourceState.DOWNLOADING,
+        ResourceEventType.RELEASE, new ReleaseTransition())
     .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
         ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
 
@@ -98,8 +93,6 @@ public class LocalizedResource implement
     .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
         ResourceEventType.REQUEST, new LocalizedResourceTransition())
     .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
-        ResourceEventType.LOCALIZED)
-    .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
         ResourceEventType.RELEASE, new ReleaseTransition())
     .installTopology();
 
@@ -230,14 +223,6 @@ public class LocalizedResource implement
     }
   }
 
-  private static class FetchDirectTransition extends FetchSuccessTransition {
-    @Override
-    public void transition(LocalizedResource rsrc, ResourceEvent event) {
-      LOG.warn("Resource " + rsrc + " localized without listening container");
-      super.transition(rsrc, event);
-    }
-  }
-
   /**
    * Resource localized, notify waiting containers.
    */
@@ -304,17 +289,4 @@ public class LocalizedResource implement
       rsrc.release(relEvent.getContainer());
     }
   }
-
-  private static class ReleasePendingTransition implements
-      MultipleArcTransition<LocalizedResource,ResourceEvent,ResourceState> {
-    @Override
-    public ResourceState transition(LocalizedResource rsrc,
-        ResourceEvent event) {
-      ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
-      rsrc.release(relEvent.getContainer());
-      return rsrc.ref.isEmpty()
-        ? ResourceState.INIT
-        : ResourceState.DOWNLOADING;
-    }
-  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri Apr 19 22:35:43 2013
@@ -31,7 +31,6 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CancellationException;
@@ -47,9 +46,11 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
@@ -112,6 +113,7 @@ import org.apache.hadoop.yarn.service.Co
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class ResourceLocalizationService extends CompositeService
@@ -492,7 +494,25 @@ public class ResourceLocalizationService
             + Path.SEPARATOR + appId;
     return path;
   }
+  
+  @VisibleForTesting
+  @Private
+  public PublicLocalizer getPublicLocalizer() {
+    return localizerTracker.publicLocalizer;
+  }
 
+  @VisibleForTesting
+  @Private
+  public LocalizerRunner getLocalizerRunner(String locId) {
+    return localizerTracker.privLocalizers.get(locId);
+  }
+  
+  @VisibleForTesting
+  @Private
+  public Map<String, LocalizerRunner> getPrivateLocalizers() {
+    return localizerTracker.privLocalizers;
+  }
+  
   /**
    * Sub-component handling the spawning of {@link ContainerLocalizer}s
    */
@@ -606,41 +626,20 @@ public class ResourceLocalizationService
     final ExecutorService threadPool;
     final CompletionService<Path> queue;
     final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
-    // TODO hack to work around broken signaling
-    final Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts;
 
     PublicLocalizer(Configuration conf) {
       this(conf, getLocalFileContext(conf),
            createLocalizerExecutor(conf),
-           new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
-           new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
+           new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
     }
     
     PublicLocalizer(Configuration conf, FileContext lfs,
         ExecutorService threadPool,
-        Map<Future<Path>,LocalizerResourceRequestEvent> pending,
-        Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
+        Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
       super("Public Localizer");
       this.lfs = lfs;
       this.conf = conf;
       this.pending = pending;
-      this.attempts = attempts;
-//      List<String> localDirs = dirsHandler.getLocalDirs();
-//      String[] publicFilecache = new String[localDirs.size()];
-//      for (int i = 0, n = localDirs.size(); i < n; ++i) {
-//        publicFilecache[i] =
-//          new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
-//      }
-//      conf.setStrings(PUBCACHE_CTXT, publicFilecache);
-
-//      this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf);
-//      List<String> localDirs = dirsHandler.getLocalDirs();
-//      String[] publicFilecache = new String[localDirs.size()];
-//      int i = 0;
-//      for (String localDir : localDirs) {
-//        publicFilecache[i++] =
-//            new Path(localDir, ContainerLocalizer.FILECACHE).toString();
-//      }
 
       this.threadPool = threadPool;
       this.queue = new ExecutorCompletionService<Path>(threadPool);
@@ -648,36 +647,45 @@ public class ResourceLocalizationService
 
     public void addResource(LocalizerResourceRequestEvent request) {
       // TODO handle failures, cancellation, requests by other containers
-      LocalResourceRequest key = request.getResource().getRequest();
+      LocalizedResource rsrc = request.getResource();
+      LocalResourceRequest key = rsrc.getRequest();
       LOG.info("Downloading public rsrc:" + key);
-      synchronized (attempts) {
-        List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
-        if (null == sigh) {
+      /*
+       * Here multiple containers may request the same resource. So we need
+       * to start downloading only when
+       * 1) ResourceState == DOWNLOADING
+       * 2) We are able to acquire non blocking semaphore lock.
+       * If not we will skip this resource as either it is getting downloaded
+       * or it FAILED / LOCALIZED.
+       */
+
+      if (rsrc.tryAcquire()) {
+        if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
           LocalResource resource = request.getResource().getRequest();
           try {
-            Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
-                "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
-                ContainerLocalizer.getEstimatedSize(resource), true);
+            Path publicDirDestPath =
+                dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+                    + ContainerLocalizer.FILECACHE,
+                  ContainerLocalizer.getEstimatedSize(resource), true);
             Path hierarchicalPath =
-              publicRsrc.getPathForLocalization(key, publicDirDestPath);
+                publicRsrc.getPathForLocalization(key, publicDirDestPath);
             if (!hierarchicalPath.equals(publicDirDestPath)) {
               publicDirDestPath = hierarchicalPath;
-              DiskChecker.checkDir(
-                new File(publicDirDestPath.toUri().getPath()));
+              DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
             }
             publicDirDestPath =
                 new Path(publicDirDestPath, Long.toString(publicRsrc
                   .nextUniqueNumber()));
-            pending.put(queue.submit(new FSDownload(
-                lfs, null, conf, publicDirDestPath, resource)),
-                request);
-            attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
+            pending.put(queue.submit(new FSDownload(lfs, null, conf,
+              publicDirDestPath, resource)), request);
           } catch (IOException e) {
+            rsrc.unlock();
+            // TODO Need to Fix IO Exceptions - Notifying resource
             LOG.error("Local path for public localization is not found. "
                 + " May be disks failed.", e);
           }
         } else {
-          sigh.add(request);
+          rsrc.unlock();
         }
       }
     }
@@ -700,24 +708,14 @@ public class ResourceLocalizationService
               LocalResourceRequest key = assoc.getResource().getRequest();
               publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
                 .getDU(new File(local.toUri()))));
-              synchronized (attempts) {
-                attempts.remove(key);
-              }
+              assoc.getResource().unlock();
             } catch (ExecutionException e) {
               LOG.info("Failed to download rsrc " + assoc.getResource(),
                   e.getCause());
               LocalResourceRequest req = assoc.getResource().getRequest();
               publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
                 .getCause()));
-              synchronized (attempts) {
-                List<LocalizerResourceRequestEvent> reqs;
-                reqs = attempts.get(req);
-                if (null == reqs) {
-                  LOG.error("Missing pending list for " + req);
-                  return;
-                }
-                attempts.remove(req);
-              }
+              assoc.getResource().unlock();
             } catch (CancellationException e) {
               // ignore; shutting down
             }
@@ -776,22 +774,35 @@ public class ResourceLocalizationService
            i.hasNext();) {
         LocalizerResourceRequestEvent evt = i.next();
         LocalizedResource nRsrc = evt.getResource();
-        if (ResourceState.LOCALIZED.equals(nRsrc.getState())) {
+        // Resource download should take place ONLY if resource is in
+        // Downloading state
+        if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
           i.remove();
           continue;
         }
+        /*
+         * Multiple containers will try to download the same resource. So the
+         * resource download should start only if
+         * 1) We can acquire a non blocking semaphore lock on resource
+         * 2) Resource is still in DOWNLOADING state
+         */
         if (nRsrc.tryAcquire()) {
-          LocalResourceRequest nextRsrc = nRsrc.getRequest();
-          LocalResource next =
-            recordFactory.newRecordInstance(LocalResource.class);
-          next.setResource(
-              ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath()));
-          next.setTimestamp(nextRsrc.getTimestamp());
-          next.setType(nextRsrc.getType());
-          next.setVisibility(evt.getVisibility());
-          next.setPattern(evt.getPattern());
-          scheduled.put(nextRsrc, evt);
-          return next;
+          if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
+            LocalResourceRequest nextRsrc = nRsrc.getRequest();
+            LocalResource next =
+                recordFactory.newRecordInstance(LocalResource.class);
+            next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
+              .getPath()));
+            next.setTimestamp(nextRsrc.getTimestamp());
+            next.setType(nextRsrc.getType());
+            next.setVisibility(evt.getVisibility());
+            next.setPattern(evt.getPattern());
+            scheduled.put(nextRsrc, evt);
+            return next;
+          } else {
+            // Need to release acquired lock
+            nRsrc.unlock();
+          }
         }
       }
       return null;
@@ -863,6 +874,12 @@ public class ResourceLocalizationService
                 new ResourceLocalizedEvent(req, ConverterUtils
                   .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
             } catch (URISyntaxException e) { }
+
+            // unlocking the resource and removing it from scheduled resource
+            // list
+            assoc.getResource().unlock();
+            scheduled.remove(req);
+            
             if (pending.isEmpty()) {
               // TODO: Synchronization
               response.setLocalizerAction(LocalizerAction.DIE);
@@ -889,11 +906,16 @@ public class ResourceLocalizationService
             break;
           case FETCH_FAILURE:
             LOG.info("DEBUG: FAILED " + req, stat.getException());
-            assoc.getResource().unlock();
             response.setLocalizerAction(LocalizerAction.DIE);
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(
                 new ResourceFailedLocalizationEvent(req, stat.getException()));
+
+            // unlocking the resource and removing it from scheduled resource
+            // list
+            assoc.getResource().unlock();
+            scheduled.remove(req);
+            
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Fri Apr 19 22:35:43 2013
@@ -129,14 +129,10 @@ public class TestLocalResourcesTrackerIm
       dispatcher.await();
       verifyTrackedResourceCount(tracker, 2);
 
-      // Verify resources in state INIT with ref-count=0 is removed.
-      Assert.assertTrue(tracker.remove(lr2, mockDelService));
-      verifyTrackedResourceCount(tracker, 1);
-
       // Verify resource with non zero ref count is not removed.
       Assert.assertEquals(2, lr1.getRefCount());
       Assert.assertFalse(tracker.remove(lr1, mockDelService));
-      verifyTrackedResourceCount(tracker, 1);
+      verifyTrackedResourceCount(tracker, 2);
 
       // Localize resource1
       ResourceLocalizedEvent rle =
@@ -151,7 +147,7 @@ public class TestLocalResourcesTrackerIm
 
       // Verify resources in state LOCALIZED with ref-count=0 is removed.
       Assert.assertTrue(tracker.remove(lr1, mockDelService));
-      verifyTrackedResourceCount(tracker, 0);
+      verifyTrackedResourceCount(tracker, 1);
     } finally {
       if (dispatcher != null) {
         dispatcher.stop();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java Fri Apr 19 22:35:43 2013
@@ -117,7 +117,7 @@ public class TestLocalizedResource {
       local.handle(new ResourceReleaseEvent(rsrcA, container1));
       dispatcher.await();
       verify(containerBus, never()).handle(isA(ContainerEvent.class));
-      assertEquals(ResourceState.INIT, local.getState());
+      assertEquals(ResourceState.DOWNLOADING, local.getState());
 
       // Register C2, C3
       final ContainerId container2 = getMockContainer(2);
@@ -176,24 +176,6 @@ public class TestLocalizedResource {
     }
   }
 
-  @Test
-  public void testDirectLocalization() throws Exception {
-    DrainDispatcher dispatcher = new DrainDispatcher();
-    dispatcher.init(new Configuration());
-    try {
-      dispatcher.start();
-      LocalResource apiRsrc = createMockResource();
-      LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
-      LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
-      Path p = new Path("file:///cache/rsrcA");
-      local.handle(new ResourceLocalizedEvent(rsrcA, p, 10));
-      dispatcher.await();
-      assertEquals(ResourceState.LOCALIZED, local.getState());
-    } finally {
-      dispatcher.stop();
-    }
-  }
-
   static LocalResource createMockResource() {
     // mock rsrc location
     org.apache.hadoop.yarn.api.records.URL uriA =

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Fri Apr 19 22:35:43 2013
@@ -34,9 +34,9 @@ import static org.mockito.Mockito.doRetu
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -53,6 +53,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
 
 import junit.framework.Assert;
 
@@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -90,20 +92,28 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalResourceStatusPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.BeforeClass;
@@ -677,6 +687,481 @@ public class TestResourceLocalizationSer
     }
   }
 
+  @Test(timeout = 100000)
+  @SuppressWarnings("unchecked")
+  public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
+
+    DrainDispatcher dispatcher1 = null;
+    try {
+      dispatcher1 = new DrainDispatcher();
+      String user = "testuser";
+      ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+
+      // mocked Resource Localization Service
+      Configuration conf = new Configuration();
+      AbstractFileSystem spylfs =
+          spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+      final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+      // We don't want files to be created
+      doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
+        anyBoolean());
+
+      // creating one local directory
+      List<Path> localDirs = new ArrayList<Path>();
+      String[] sDirs = new String[1];
+      for (int i = 0; i < 1; ++i) {
+        localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+        sDirs[i] = localDirs.get(i).toString();
+      }
+      conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+      // setting log directory.
+      String logDir =
+          lfs.makeQualified(new Path(basedir, "logdir ")).toString();
+      conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
+      LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
+      localDirHandler.init(conf);
+      // Registering event handlers
+      EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+      dispatcher1.register(ApplicationEventType.class, applicationBus);
+      EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+      dispatcher1.register(ContainerEventType.class, containerBus);
+
+      ContainerExecutor exec = mock(ContainerExecutor.class);
+      DeletionService delService = mock(DeletionService.class);
+      LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+      // initializing directory handler.
+      dirsHandler.init(conf);
+
+      dispatcher1.init(conf);
+      dispatcher1.start();
+
+      ResourceLocalizationService rls =
+          new ResourceLocalizationService(dispatcher1, exec, delService,
+            localDirHandler);
+      dispatcher1.register(LocalizationEventType.class, rls);
+      rls.init(conf);
+
+      rls.handle(createApplicationLocalizationEvent(user, appId));
+
+      LocalResourceRequest req =
+          new LocalResourceRequest(new Path("file:///tmp"), 123L,
+            LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
+
+      // We need to pre-populate the LocalizerRunner as the
+      // Resource Localization Service code internally starts them which
+      // definitely we don't want.
+
+      // creating new containers and populating corresponding localizer runners
+
+      // Container - 1
+      ContainerImpl container1 = createMockContainer(user, 1);
+      String localizerId1 = container1.getContainerID().toString();
+      rls.getPrivateLocalizers().put(
+        localizerId1,
+        rls.new LocalizerRunner(new LocalizerContext(user, container1
+          .getContainerID(), null), localizerId1));
+      LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);
+
+      dispatcher1.getEventHandler().handle(
+        createContainerLocalizationEvent(container1,
+          LocalResourceVisibility.PRIVATE, req));
+      Assert
+        .assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 1, 200));
+
+      // Container - 2 now makes the request.
+      ContainerImpl container2 = createMockContainer(user, 2);
+      String localizerId2 = container2.getContainerID().toString();
+      rls.getPrivateLocalizers().put(
+        localizerId2,
+        rls.new LocalizerRunner(new LocalizerContext(user, container2
+          .getContainerID(), null), localizerId2));
+      LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
+      dispatcher1.getEventHandler().handle(
+        createContainerLocalizationEvent(container2,
+          LocalResourceVisibility.PRIVATE, req));
+      Assert
+        .assertTrue(waitForPrivateDownloadToStart(rls, localizerId2, 1, 200));
+
+      // Retrieving localized resource.
+      LocalResourcesTracker tracker =
+          rls.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, user,
+            appId);
+      LocalizedResource lr = tracker.getLocalizedResource(req);
+      // Resource would now have moved into DOWNLOADING state
+      Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState());
+      // Resource should have one permit
+      Assert.assertEquals(1, lr.sem.availablePermits());
+
+      // Resource Localization Service receives first heart beat from
+      // ContainerLocalizer for container1
+      LocalizerHeartbeatResponse response1 =
+          rls.heartbeat(createLocalizerStatus(localizerId1));
+
+      // Resource must have been added to scheduled map
+      Assert.assertEquals(1, localizerRunner1.scheduled.size());
+      // Checking resource in the response and also available permits for it.
+      Assert.assertEquals(req.getResource(), response1.getResourceSpecs()
+        .get(0).getResource().getResource());
+      Assert.assertEquals(0, lr.sem.availablePermits());
+
+      // Resource Localization Service now receives first heart beat from
+      // ContainerLocalizer for container2
+      LocalizerHeartbeatResponse response2 =
+          rls.heartbeat(createLocalizerStatus(localizerId2));
+
+      // Resource must not have been added to scheduled map
+      Assert.assertEquals(0, localizerRunner2.scheduled.size());
+      // No resource is returned in response
+      Assert.assertEquals(0, response2.getResourceSpecs().size());
+
+      // ContainerLocalizer - 1 now sends failed resource heartbeat.
+      rls.heartbeat(createLocalizerStatusForFailedResource(localizerId1, req));
+
+      // Resource Localization should fail and state is modified accordingly.
+      // Also Local should be release on the LocalizedResource.
+      Assert
+        .assertTrue(waitForResourceState(lr, rls, req,
+          LocalResourceVisibility.PRIVATE, user, appId, ResourceState.FAILED,
+          200));
+      Assert.assertTrue(lr.getState().equals(ResourceState.FAILED));
+      Assert.assertEquals(0, localizerRunner1.scheduled.size());
+
+      // Now Container-2 once again sends heart beat to resource localization
+      // service
+
+      // Now container-2 again try to download the resource it should still
+      // not get the resource as the resource is now not in DOWNLOADING state.
+      response2 = rls.heartbeat(createLocalizerStatus(localizerId2));
+
+      // Resource must not have been added to scheduled map.
+      // Also as the resource has failed download it will be removed from
+      // pending list.
+      Assert.assertEquals(0, localizerRunner2.scheduled.size());
+      Assert.assertEquals(0, localizerRunner2.pending.size());
+      Assert.assertEquals(0, response2.getResourceSpecs().size());
+
+    } finally {
+      if (dispatcher1 != null) {
+        dispatcher1.stop();
+      }
+    }
+  }
+
+  private LocalizerStatus createLocalizerStatusForFailedResource(
+      String localizerId, LocalResourceRequest req) {
+    LocalizerStatus status = createLocalizerStatus(localizerId);
+    LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl();
+    resourceStatus.setException(new YarnRemoteExceptionPBImpl("test"));
+    resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE);
+    resourceStatus.setResource(req);
+    status.addResourceStatus(resourceStatus);
+    return status;
+  }
+
+  private LocalizerStatus createLocalizerStatus(String localizerId1) {
+    LocalizerStatus status = new LocalizerStatusPBImpl();
+    status.setLocalizerId(localizerId1);
+    return status;
+  }
+
+  private LocalizationEvent createApplicationLocalizationEvent(String user,
+      ApplicationId appId) {
+    Application app = mock(Application.class);
+    when(app.getUser()).thenReturn(user);
+    when(app.getAppId()).thenReturn(appId);
+    return new ApplicationLocalizationEvent(
+      LocalizationEventType.INIT_APPLICATION_RESOURCES, app);
+  }
+
+  @Test(timeout = 100000)
+  @SuppressWarnings("unchecked")
+  public void testParallelDownloadAttemptsForPublicResource() throws Exception {
+
+    DrainDispatcher dispatcher1 = null;
+    String user = "testuser";
+    try {
+      // Setting up ResourceLocalization service.
+      Configuration conf = new Configuration();
+      dispatcher1 = new DrainDispatcher();
+      AbstractFileSystem spylfs =
+          spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+      final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+      // We don't want files to be created
+      doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
+        anyBoolean());
+
+      // creating one local directory
+      List<Path> localDirs = new ArrayList<Path>();
+      String[] sDirs = new String[1];
+      for (int i = 0; i < 1; ++i) {
+        localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+        sDirs[i] = localDirs.get(i).toString();
+      }
+      conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+      // setting log directory.
+      String logDir =
+          lfs.makeQualified(new Path(basedir, "logdir ")).toString();
+      conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
+      // Registering event handlers
+      EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+      dispatcher1.register(ApplicationEventType.class, applicationBus);
+      EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+      dispatcher1.register(ContainerEventType.class, containerBus);
+
+      ContainerExecutor exec = mock(ContainerExecutor.class);
+      DeletionService delService = mock(DeletionService.class);
+      LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+      // initializing directory handler.
+      dirsHandler.init(conf);
+
+      dispatcher1.init(conf);
+      dispatcher1.start();
+
+      // Creating and initializing ResourceLocalizationService but not starting
+      // it as otherwise it will remove requests from pending queue.
+      ResourceLocalizationService rawService =
+          new ResourceLocalizationService(dispatcher1, exec, delService,
+            dirsHandler);
+      ResourceLocalizationService spyService = spy(rawService);
+      dispatcher1.register(LocalizationEventType.class, spyService);
+      spyService.init(conf);
+
+      // Initially pending map should be empty for public localizer
+      Assert.assertEquals(0, spyService.getPublicLocalizer().pending.size());
+
+      LocalResourceRequest req =
+          new LocalResourceRequest(new Path("/tmp"), 123L,
+            LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, "");
+
+      // Initializing application
+      ApplicationImpl app = mock(ApplicationImpl.class);
+      ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+      when(app.getAppId()).thenReturn(appId);
+      when(app.getUser()).thenReturn(user);
+      dispatcher1.getEventHandler().handle(
+        new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+
+      // Container - 1
+
+      // container requesting the resource
+      ContainerImpl container1 = createMockContainer(user, 1);
+      dispatcher1.getEventHandler().handle(
+        createContainerLocalizationEvent(container1,
+          LocalResourceVisibility.PUBLIC, req));
+
+      // Waiting for resource to change into DOWNLOADING state.
+      Assert.assertTrue(waitForResourceState(null, spyService, req,
+        LocalResourceVisibility.PUBLIC, user, null, ResourceState.DOWNLOADING,
+        200));
+
+      // Waiting for download to start.
+      Assert.assertTrue(waitForPublicDownloadToStart(spyService, 1, 200));
+
+      LocalizedResource lr =
+          getLocalizedResource(spyService, req, LocalResourceVisibility.PUBLIC,
+            user, null);
+      // Resource would now have moved into DOWNLOADING state
+      Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState());
+
+      // pending should have this resource now.
+      Assert.assertEquals(1, spyService.getPublicLocalizer().pending.size());
+      // Now resource should have 0 permit.
+      Assert.assertEquals(0, lr.sem.availablePermits());
+
+      // Container - 2
+
+      // Container requesting the same resource.
+      ContainerImpl container2 = createMockContainer(user, 2);
+      dispatcher1.getEventHandler().handle(
+        createContainerLocalizationEvent(container2,
+          LocalResourceVisibility.PUBLIC, req));
+
+      // Waiting for download to start. This should return false as new download
+      // will not start
+      Assert.assertFalse(waitForPublicDownloadToStart(spyService, 2, 100));
+
+      // Now Failing the resource download. As a part of it
+      // resource state is changed and then lock is released.
+      ResourceFailedLocalizationEvent locFailedEvent =
+          new ResourceFailedLocalizationEvent(req, new Exception("test"));
+      spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user,
+        null).handle(locFailedEvent);
+
+      // Waiting for resource to change into FAILED state.
+      Assert.assertTrue(waitForResourceState(lr, spyService, req,
+        LocalResourceVisibility.PUBLIC, user, null, ResourceState.FAILED, 200));
+      // releasing lock as a part of download failed process.
+      lr.unlock();
+      // removing pending download request.
+      spyService.getPublicLocalizer().pending.clear();
+
+      // Now I need to simulate a race condition wherein Event is added to
+      // dispatcher before resource state changes to either FAILED or LOCALIZED
+      // Hence sending event directly to dispatcher.
+      LocalizerResourceRequestEvent localizerEvent =
+          new LocalizerResourceRequestEvent(lr, null,
+            mock(LocalizerContext.class), null);
+
+      dispatcher1.getEventHandler().handle(localizerEvent);
+      // Waiting for download to start. This should return false as new download
+      // will not start
+      Assert.assertFalse(waitForPublicDownloadToStart(spyService, 1, 100));
+      // Checking available permits now.
+      Assert.assertEquals(1, lr.sem.availablePermits());
+
+    } finally {
+      if (dispatcher1 != null) {
+        dispatcher1.stop();
+      }
+    }
+
+  }
+
+  private boolean waitForPrivateDownloadToStart(
+      ResourceLocalizationService service, String localizerId, int size,
+      int maxWaitTime) {
+    List<LocalizerResourceRequestEvent> pending = null;
+    // Waiting for localizer to be created.
+    do {
+      if (service.getPrivateLocalizers().get(localizerId) != null) {
+        pending = service.getPrivateLocalizers().get(localizerId).pending;
+      }
+      if (pending == null) {
+        try {
+          maxWaitTime -= 20;
+          Thread.sleep(20);
+        } catch (Exception e) {
+        }
+      } else {
+        break;
+      }
+    } while (maxWaitTime > 0);
+    if (pending == null) {
+      return false;
+    }
+    do {
+      if (pending.size() == size) {
+        return true;
+      } else {
+        try {
+          maxWaitTime -= 20;
+          Thread.sleep(20);
+        } catch (Exception e) {
+        }
+      }
+    } while (maxWaitTime > 0);
+    return pending.size() == size;
+  }
+
+  private boolean waitForPublicDownloadToStart(
+      ResourceLocalizationService service, int size, int maxWaitTime) {
+    Map<Future<Path>, LocalizerResourceRequestEvent> pending = null;
+    // Waiting for localizer to be created.
+    do {
+      if (service.getPublicLocalizer() != null) {
+        pending = service.getPublicLocalizer().pending;
+      }
+      if (pending == null) {
+        try {
+          maxWaitTime -= 20;
+          Thread.sleep(20);
+        } catch (Exception e) {
+        }
+      } else {
+        break;
+      }
+    } while (maxWaitTime > 0);
+    if (pending == null) {
+      return false;
+    }
+    do {
+      if (pending.size() == size) {
+        return true;
+      } else {
+        try {
+          maxWaitTime -= 20;
+          Thread.sleep(20);
+        } catch (InterruptedException e) {
+        }
+      }
+    } while (maxWaitTime > 0);
+    return pending.size() == size;
+
+  }
+
+  private LocalizedResource getLocalizedResource(
+      ResourceLocalizationService service, LocalResourceRequest req,
+      LocalResourceVisibility vis, String user, ApplicationId appId) {
+    return service.getLocalResourcesTracker(vis, user, appId)
+      .getLocalizedResource(req);
+  }
+
+  private boolean waitForResourceState(LocalizedResource lr,
+      ResourceLocalizationService service, LocalResourceRequest req,
+      LocalResourceVisibility vis, String user, ApplicationId appId,
+      ResourceState resourceState, long maxWaitTime) {
+    LocalResourcesTracker tracker = null;
+    // checking tracker is created
+    do {
+      if (tracker == null) {
+        tracker = service.getLocalResourcesTracker(vis, user, appId);
+      }
+      if (tracker != null && lr == null) {
+        lr = tracker.getLocalizedResource(req);
+      }
+      if (lr != null) {
+        break;
+      } else {
+        try {
+          maxWaitTime -= 20;
+          Thread.sleep(20);
+        } catch (InterruptedException e) {
+        }
+      }
+    } while (maxWaitTime > 0);
+    // this will wait till resource state is changed to (resourceState).
+    if (lr == null) {
+      return false;
+    }
+    do {
+      if (!lr.getState().equals(resourceState)) {
+        try {
+          maxWaitTime -= 50;
+          Thread.sleep(50);
+        } catch (InterruptedException e) {
+        }
+      } else {
+        break;
+      }
+    } while (maxWaitTime > 0);
+    return lr.getState().equals(resourceState);
+  }
+
+  private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
+      ContainerImpl container, LocalResourceVisibility vis,
+      LocalResourceRequest req) {
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
+        new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+    List<LocalResourceRequest> resourceList =
+        new ArrayList<LocalResourceRequest>();
+    resourceList.add(req);
+    reqs.put(vis, resourceList);
+    return new ContainerLocalizationRequestEvent(container, reqs);
+  }
+
+  private ContainerImpl createMockContainer(String user, int containerId) {
+    ContainerImpl container = mock(ContainerImpl.class);
+    when(container.getContainerID()).thenReturn(
+      BuilderUtils.newContainerId(1, 1, 1, containerId));
+    when(container.getUser()).thenReturn(user);
+    Credentials mockCredentials = mock(Credentials.class);
+    when(container.getCredentials()).thenReturn(mockCredentials);
+    return container;
+  }
+
   private static URL getPath(String path) {
     URL url = BuilderUtils.newURL("file", null, 0, path);
     return url;