You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by jl...@apache.org on 2013/08/01 21:34:55 UTC

svn commit: r1509389 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/

Author: jlowe
Date: Thu Aug  1 19:34:55 2013
New Revision: 1509389

URL: http://svn.apache.org/r1509389
Log:
YARN-573. Shared data structures in Public Localizer and Private Localizer are not Thread safe. Contributed by Omkar Vinit Joshi

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1509389&r1=1509388&r2=1509389&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Aug  1 19:34:55 2013
@@ -783,6 +783,9 @@ Release 2.1.0-beta - 2013-08-06
     YARN-945. Removed setting of AMRMToken's service from ResourceManager
     and changed client libraries do it all the time and correctly. (vinodkv)
 
+    YARN-573. Shared data structures in Public Localizer and Private Localizer
+    are not Thread safe. (Omkar Vinit Joshi via jlowe)
+
   BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
 
     YARN-158. Yarn creating package-info.java must not depend on sh.

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1509389&r1=1509388&r2=1509389&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Aug  1 19:34:55 2013
@@ -29,6 +29,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -630,23 +631,16 @@ public class ResourceLocalizationService
     final Configuration conf;
     final ExecutorService threadPool;
     final CompletionService<Path> queue;
+    // Its shared between public localizer and dispatcher thread.
     final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
 
     PublicLocalizer(Configuration conf) {
-      this(conf, getLocalFileContext(conf),
-           createLocalizerExecutor(conf),
-           new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
-    }
-    
-    PublicLocalizer(Configuration conf, FileContext lfs,
-        ExecutorService threadPool,
-        Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
       super("Public Localizer");
-      this.lfs = lfs;
+      this.lfs = getLocalFileContext(conf);
       this.conf = conf;
-      this.pending = pending;
-
-      this.threadPool = threadPool;
+      this.pending =
+          new ConcurrentHashMap<Future<Path>, LocalizerResourceRequestEvent>();
+      this.threadPool = createLocalizerExecutor(conf);
       this.queue = new ExecutorCompletionService<Path>(threadPool);
     }
 
@@ -748,6 +742,7 @@ public class ResourceLocalizationService
     final LocalizerContext context;
     final String localizerId;
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
+    // Its a shared list between Private Localizer and dispatcher thread.
     final List<LocalizerResourceRequestEvent> pending;
 
     // TODO: threadsafe, use outer?
@@ -758,13 +753,14 @@ public class ResourceLocalizationService
       super("LocalizerRunner for " + localizerId);
       this.context = context;
       this.localizerId = localizerId;
-      this.pending = new ArrayList<LocalizerResourceRequestEvent>();
+      this.pending =
+          Collections
+            .synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
       this.scheduled =
           new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
     }
 
     public void addResource(LocalizerResourceRequestEvent request) {
-      // TDOO: Synchronization
       pending.add(request);
     }
 
@@ -774,43 +770,44 @@ public class ResourceLocalizationService
      * @return
      */
     private LocalResource findNextResource() {
-      // TODO: Synchronization
-      for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
-           i.hasNext();) {
-        LocalizerResourceRequestEvent evt = i.next();
-        LocalizedResource nRsrc = evt.getResource();
-        // Resource download should take place ONLY if resource is in
-        // Downloading state
-        if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
-          i.remove();
-          continue;
-        }
-        /*
-         * Multiple containers will try to download the same resource. So the
-         * resource download should start only if
-         * 1) We can acquire a non blocking semaphore lock on resource
-         * 2) Resource is still in DOWNLOADING state
-         */
-        if (nRsrc.tryAcquire()) {
-          if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
-            LocalResourceRequest nextRsrc = nRsrc.getRequest();
-            LocalResource next =
-                recordFactory.newRecordInstance(LocalResource.class);
-            next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
-              .getPath()));
-            next.setTimestamp(nextRsrc.getTimestamp());
-            next.setType(nextRsrc.getType());
-            next.setVisibility(evt.getVisibility());
-            next.setPattern(evt.getPattern());
-            scheduled.put(nextRsrc, evt);
-            return next;
-          } else {
-            // Need to release acquired lock
-            nRsrc.unlock();
-          }
-        }
+      synchronized (pending) {
+        for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
+            i.hasNext();) {
+         LocalizerResourceRequestEvent evt = i.next();
+         LocalizedResource nRsrc = evt.getResource();
+         // Resource download should take place ONLY if resource is in
+         // Downloading state
+         if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
+           i.remove();
+           continue;
+         }
+         /*
+          * Multiple containers will try to download the same resource. So the
+          * resource download should start only if
+          * 1) We can acquire a non blocking semaphore lock on resource
+          * 2) Resource is still in DOWNLOADING state
+          */
+         if (nRsrc.tryAcquire()) {
+           if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
+             LocalResourceRequest nextRsrc = nRsrc.getRequest();
+             LocalResource next =
+                 recordFactory.newRecordInstance(LocalResource.class);
+             next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
+               .getPath()));
+             next.setTimestamp(nextRsrc.getTimestamp());
+             next.setType(nextRsrc.getType());
+             next.setVisibility(evt.getVisibility());
+             next.setPattern(evt.getPattern());
+             scheduled.put(nextRsrc, evt);
+             return next;
+           } else {
+             // Need to release acquired lock
+             nRsrc.unlock();
+           }
+         }
+       }
+       return null;
       }
-      return null;
     }
 
     LocalizerHeartbeatResponse update(