You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by jl...@apache.org on 2013/08/14 00:22:48 UTC

svn commit: r1513674 - in /hadoop/common/branches/branch-0.23/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/

Author: jlowe
Date: Tue Aug 13 22:22:47 2013
New Revision: 1513674

URL: http://svn.apache.org/r1513674
Log:
YARN-543. Shared data structures in Public Localizer and Private Localizer are not Thread safe. Contributed by Omkar Vinit Joshi and Mit Desai

Modified:
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt?rev=1513674&r1=1513673&r2=1513674&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt Tue Aug 13 22:22:47 2013
@@ -14,9 +14,6 @@ Release 0.23.10 - UNRELEASED
     YARN-985. Nodemanager should log where a resource was localized (Ravi
     Prakash via jeagles)
 
-    YARN-1036. Distributed Cache gives inconsistent result if cache files get
-    deleted from task tracker (Mayank Bansal and Ravi Prakash via jlowe)
-
   OPTIMIZATIONS
 
   BUG FIXES
@@ -24,6 +21,12 @@ Release 0.23.10 - UNRELEASED
     YARN-949. Failed log aggregation can leave a file open. (Kihwal Lee via
     jlowe)
 
+    YARN-1036. Distributed Cache gives inconsistent result if cache files get
+    deleted from task tracker (Mayank Bansal and Ravi Prakash via jlowe)
+
+    YARN-543. Shared data structures in Public Localizer and Private
+    Localizer are not Thread safe. (Omkar Vinit Joshi and Mit Desai via jlowe)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1513674&r1=1513673&r2=1513674&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Aug 13 22:22:47 2013
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -567,26 +568,14 @@ public class ResourceLocalizationService
     final Configuration conf;
     final ExecutorService threadPool;
     final CompletionService<Path> queue;
+    // Its shared between public localizer and dispatcher thread.
     final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
     // TODO hack to work around broken signaling
     final Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts;
 
     PublicLocalizer(Configuration conf) {
-      this(conf, getLocalFileContext(conf),
-           createLocalizerExecutor(conf),
-           new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
-           new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
-    }
-    
-    PublicLocalizer(Configuration conf, FileContext lfs,
-        ExecutorService threadPool,
-        Map<Future<Path>,LocalizerResourceRequestEvent> pending,
-        Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
-      super("Public Localizer");
-      this.lfs = lfs;
+      this.lfs = getLocalFileContext(conf);
       this.conf = conf;
-      this.pending = pending;
-      this.attempts = attempts;
 //      List<String> localDirs = dirsHandler.getLocalDirs();
 //      String[] publicFilecache = new String[localDirs.size()];
 //      for (int i = 0, n = localDirs.size(); i < n; ++i) {
@@ -604,7 +593,11 @@ public class ResourceLocalizationService
 //            new Path(localDir, ContainerLocalizer.FILECACHE).toString();
 //      }
 
-      this.threadPool = threadPool;
+      this.pending =
+          new ConcurrentHashMap<Future<Path>, LocalizerResourceRequestEvent>();
+      this.attempts =
+          new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>();
+      this.threadPool = createLocalizerExecutor(conf);
       this.queue = new ExecutorCompletionService<Path>(threadPool);
     }
 
@@ -708,6 +701,7 @@ public class ResourceLocalizationService
     final LocalizerContext context;
     final String localizerId;
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
+    // Its a shared list between Private Localizer and dispatcher thread.
     final List<LocalizerResourceRequestEvent> pending;
 
     // TODO: threadsafe, use outer?
@@ -718,13 +712,14 @@ public class ResourceLocalizationService
       super("LocalizerRunner for " + localizerId);
       this.context = context;
       this.localizerId = localizerId;
-      this.pending = new ArrayList<LocalizerResourceRequestEvent>();
+      this.pending =
+          Collections
+            .synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
       this.scheduled =
           new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
     }
 
     public void addResource(LocalizerResourceRequestEvent request) {
-      // TDOO: Synchronization
       pending.add(request);
     }
 
@@ -734,30 +729,31 @@ public class ResourceLocalizationService
      * @return
      */
     private LocalResource findNextResource() {
-      // TODO: Synchronization
-      for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
-           i.hasNext();) {
-        LocalizerResourceRequestEvent evt = i.next();
-        LocalizedResource nRsrc = evt.getResource();
-        if (ResourceState.LOCALIZED.equals(nRsrc.getState())) {
-          i.remove();
-          continue;
-        }
-        if (nRsrc.tryAcquire()) {
-          LocalResourceRequest nextRsrc = nRsrc.getRequest();
-          LocalResource next =
-            recordFactory.newRecordInstance(LocalResource.class);
-          next.setResource(
-              ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath()));
-          next.setTimestamp(nextRsrc.getTimestamp());
-          next.setType(nextRsrc.getType());
-          next.setVisibility(evt.getVisibility());
-          next.setPattern(evt.getPattern());
-          scheduled.put(nextRsrc, evt);
-          return next;
+      synchronized (pending) {
+        for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
+            i.hasNext();) {
+          LocalizerResourceRequestEvent evt = i.next();
+          LocalizedResource nRsrc = evt.getResource();
+          if (ResourceState.LOCALIZED.equals(nRsrc.getState())) {
+            i.remove();
+            continue;
+          }
+          if (nRsrc.tryAcquire()) {
+            LocalResourceRequest nextRsrc = nRsrc.getRequest();
+            LocalResource next =
+              recordFactory.newRecordInstance(LocalResource.class);
+            next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
+              .getPath()));
+            next.setTimestamp(nextRsrc.getTimestamp());
+            next.setType(nextRsrc.getType());
+            next.setVisibility(evt.getVisibility());
+            next.setPattern(evt.getPattern());
+            scheduled.put(nextRsrc, evt);
+            return next;
+          }
         }
+        return null;
       }
-      return null;
     }
 
     // TODO this sucks. Fix it later