You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/06/08 03:39:20 UTC

svn commit: r1133230 - in /hadoop/mapreduce/branches/MR-279: CHANGES.txt yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

Author: cdouglas
Date: Wed Jun  8 01:39:20 2011
New Revision: 1133230

URL: http://svn.apache.org/viewvc?rev=1133230&view=rev
Log:
Work around broken signaling in public cache.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1133230&r1=1133229&r2=1133230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed Jun  8 01:39:20 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
+
+    Work around broken signaling in public cache. (cdouglas)
+
     Cleanup redundant code in TaskAttemptImpl. (sharad)
 
     Disable aggregation of logs onto DFS till JobHistoryServer starts

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1133230&r1=1133229&r2=1133230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Jun  8 01:39:20 2011
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CancellationException;
@@ -486,21 +487,26 @@ public class ResourceLocalizationService
     final ExecutorService threadPool;
     final LocalDirAllocator publicDirs;
     final CompletionService<Path> queue;
-    final ConcurrentMap<Future<Path>,LocalizerResourceRequestEvent> pending;
+    final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
+    // TODO hack to work around broken signaling
+    final Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts;
 
     PublicLocalizer(Configuration conf) {
       this(conf, getLocalFileContext(conf),
            Executors.newFixedThreadPool(conf.getInt(
                NM_MAX_PUBLIC_FETCH_THREADS, DEFAULT_MAX_PUBLIC_FETCH_THREADS)),
-           new ConcurrentHashMap<Future<Path>,LocalizerResourceRequestEvent>());
+           new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
+           new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
     }
 
     PublicLocalizer(Configuration conf, FileContext lfs,
         ExecutorService threadPool,
-        ConcurrentMap<Future<Path>,LocalizerResourceRequestEvent> pending) {
+        Map<Future<Path>,LocalizerResourceRequestEvent> pending,
+        Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
       this.lfs = lfs;
       this.conf = conf;
       this.pending = pending;
+      this.attempts = attempts;
       String[] publicFilecache = new String[localDirs.size()];
       for (int i = 0, n = localDirs.size(); i < n; ++i) {
         publicFilecache[i] =
@@ -514,12 +520,20 @@ public class ResourceLocalizationService
 
     public void addResource(LocalizerResourceRequestEvent request) {
       // TODO handle failures, cancellation, requests by other containers
-      LOG.info("Downloading public rsrc:" + request.getResource().getRequest());
-      pending.putIfAbsent(
-          queue.submit(new FSDownload(
-              lfs, null, conf, publicDirs, request.getResource().getRequest(),
-              new Random())),
-          request);
+      LocalResourceRequest key = request.getResource().getRequest();
+      LOG.info("Downloading public rsrc:" + key);
+      synchronized (attempts) {
+        List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
+        if (null == sigh) {
+          pending.put(queue.submit(new FSDownload(
+                  lfs, null, conf, publicDirs,
+                  request.getResource().getRequest(), new Random())),
+              request);
+          attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
+        } else {
+          sigh.add(request);
+        }
+      }
     }
 
     @Override
@@ -529,12 +543,21 @@ public class ResourceLocalizationService
         while (!Thread.currentThread().isInterrupted()) {
           try {
             Future<Path> completed = queue.take();
-            LocalizerResourceRequestEvent assoc = pending.get(completed);
+            LocalizerResourceRequestEvent assoc = pending.remove(completed);
             try {
               Path local = completed.get();
+              if (null == assoc) {
+                LOG.error("Localized unkonwn resource to " + completed);
+                // TODO delete
+                return;
+              }
+              LocalResourceRequest key = assoc.getResource().getRequest();
               assoc.getResource().handle(
-                  new ResourceLocalizedEvent(assoc.getResource().getRequest(),
+                  new ResourceLocalizedEvent(key,
                     local, FileUtil.getDU(new File(local.toUri()))));
+              synchronized (attempts) {
+                attempts.remove(key);
+              }
             } catch (ExecutionException e) {
               LOG.info("Failed to download rsrc " + assoc.getResource(),
                   e.getCause());
@@ -542,6 +565,22 @@ public class ResourceLocalizationService
                   new ContainerResourceFailedEvent(
                     assoc.getContext().getContainerId(),
                     assoc.getResource().getRequest(), e.getCause()));
+              synchronized (attempts) {
+                LocalResourceRequest req = assoc.getResource().getRequest();
+                List<LocalizerResourceRequestEvent> reqs = attempts.get(req);
+                if (null == reqs) {
+                  LOG.error("Missing pending list for " + req);
+                  return;
+                }
+                if (reqs.isEmpty()) {
+                  attempts.remove(req);
+                }
+                LocalizerResourceRequestEvent request = reqs.remove(0);
+                pending.put(queue.submit(new FSDownload(
+                        lfs, null, conf, publicDirs,
+                        request.getResource().getRequest(), new Random())),
+                    request);
+              }
             } catch (CancellationException e) {
               // ignore; shutting down
             }