You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by jl...@apache.org on 2013/08/14 00:22:48 UTC
svn commit: r1513674 - in
/hadoop/common/branches/branch-0.23/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/
Author: jlowe
Date: Tue Aug 13 22:22:47 2013
New Revision: 1513674
URL: http://svn.apache.org/r1513674
Log:
YARN-543. Shared data structures in Public Localizer and Private Localizer are not Thread safe. Contributed by Omkar Vinit Joshi and Mit Desai
Modified:
hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt?rev=1513674&r1=1513673&r2=1513674&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt Tue Aug 13 22:22:47 2013
@@ -14,9 +14,6 @@ Release 0.23.10 - UNRELEASED
YARN-985. Nodemanager should log where a resource was localized (Ravi
Prakash via jeagles)
- YARN-1036. Distributed Cache gives inconsistent result if cache files get
- deleted from task tracker (Mayank Bansal and Ravi Prakash via jlowe)
-
OPTIMIZATIONS
BUG FIXES
@@ -24,6 +21,12 @@ Release 0.23.10 - UNRELEASED
YARN-949. Failed log aggregation can leave a file open. (Kihwal Lee via
jlowe)
+ YARN-1036. Distributed Cache gives inconsistent result if cache files get
+ deleted from task tracker (Mayank Bansal and Ravi Prakash via jlowe)
+
+ YARN-543. Shared data structures in Public Localizer and Private
+ Localizer are not Thread safe. (Omkar Vinit Joshi and Mit Desai via jlowe)
+
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1513674&r1=1513673&r2=1513674&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Aug 13 22:22:47 2013
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -567,26 +568,14 @@ public class ResourceLocalizationService
final Configuration conf;
final ExecutorService threadPool;
final CompletionService<Path> queue;
+ // Its shared between public localizer and dispatcher thread.
final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
// TODO hack to work around broken signaling
final Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts;
PublicLocalizer(Configuration conf) {
- this(conf, getLocalFileContext(conf),
- createLocalizerExecutor(conf),
- new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
- new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
- }
-
- PublicLocalizer(Configuration conf, FileContext lfs,
- ExecutorService threadPool,
- Map<Future<Path>,LocalizerResourceRequestEvent> pending,
- Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
- super("Public Localizer");
- this.lfs = lfs;
+ this.lfs = getLocalFileContext(conf);
this.conf = conf;
- this.pending = pending;
- this.attempts = attempts;
// List<String> localDirs = dirsHandler.getLocalDirs();
// String[] publicFilecache = new String[localDirs.size()];
// for (int i = 0, n = localDirs.size(); i < n; ++i) {
@@ -604,7 +593,11 @@ public class ResourceLocalizationService
// new Path(localDir, ContainerLocalizer.FILECACHE).toString();
// }
- this.threadPool = threadPool;
+ this.pending =
+ new ConcurrentHashMap<Future<Path>, LocalizerResourceRequestEvent>();
+ this.attempts =
+ new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>();
+ this.threadPool = createLocalizerExecutor(conf);
this.queue = new ExecutorCompletionService<Path>(threadPool);
}
@@ -708,6 +701,7 @@ public class ResourceLocalizationService
final LocalizerContext context;
final String localizerId;
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
+ // Its a shared list between Private Localizer and dispatcher thread.
final List<LocalizerResourceRequestEvent> pending;
// TODO: threadsafe, use outer?
@@ -718,13 +712,14 @@ public class ResourceLocalizationService
super("LocalizerRunner for " + localizerId);
this.context = context;
this.localizerId = localizerId;
- this.pending = new ArrayList<LocalizerResourceRequestEvent>();
+ this.pending =
+ Collections
+ .synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
this.scheduled =
new HashMap<LocalResourceRequest, LocalizerResourceRequestEvent>();
}
public void addResource(LocalizerResourceRequestEvent request) {
- // TDOO: Synchronization
pending.add(request);
}
@@ -734,30 +729,31 @@ public class ResourceLocalizationService
* @return
*/
private LocalResource findNextResource() {
- // TODO: Synchronization
- for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
- i.hasNext();) {
- LocalizerResourceRequestEvent evt = i.next();
- LocalizedResource nRsrc = evt.getResource();
- if (ResourceState.LOCALIZED.equals(nRsrc.getState())) {
- i.remove();
- continue;
- }
- if (nRsrc.tryAcquire()) {
- LocalResourceRequest nextRsrc = nRsrc.getRequest();
- LocalResource next =
- recordFactory.newRecordInstance(LocalResource.class);
- next.setResource(
- ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath()));
- next.setTimestamp(nextRsrc.getTimestamp());
- next.setType(nextRsrc.getType());
- next.setVisibility(evt.getVisibility());
- next.setPattern(evt.getPattern());
- scheduled.put(nextRsrc, evt);
- return next;
+ synchronized (pending) {
+ for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
+ i.hasNext();) {
+ LocalizerResourceRequestEvent evt = i.next();
+ LocalizedResource nRsrc = evt.getResource();
+ if (ResourceState.LOCALIZED.equals(nRsrc.getState())) {
+ i.remove();
+ continue;
+ }
+ if (nRsrc.tryAcquire()) {
+ LocalResourceRequest nextRsrc = nRsrc.getRequest();
+ LocalResource next =
+ recordFactory.newRecordInstance(LocalResource.class);
+ next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
+ .getPath()));
+ next.setTimestamp(nextRsrc.getTimestamp());
+ next.setType(nextRsrc.getType());
+ next.setVisibility(evt.getVisibility());
+ next.setPattern(evt.getPattern());
+ scheduled.put(nextRsrc, evt);
+ return next;
+ }
}
+ return null;
}
- return null;
}
// TODO this sucks. Fix it later