You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/06/08 03:39:20 UTC
svn commit: r1133230 - in /hadoop/mapreduce/branches/MR-279: CHANGES.txt
yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Author: cdouglas
Date: Wed Jun 8 01:39:20 2011
New Revision: 1133230
URL: http://svn.apache.org/viewvc?rev=1133230&view=rev
Log:
Work around broken signaling in public cache.
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1133230&r1=1133229&r2=1133230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed Jun 8 01:39:20 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
MAPREDUCE-279
+
+ Work around broken signaling in public cache. (cdouglas)
+
Cleanup redundant code in TaskAttemptImpl. (sharad)
Disable aggregation of logs onto DFS till JobHistoryServer starts
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1133230&r1=1133229&r2=1133230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Jun 8 01:39:20 2011
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CancellationException;
@@ -486,21 +487,26 @@ public class ResourceLocalizationService
final ExecutorService threadPool;
final LocalDirAllocator publicDirs;
final CompletionService<Path> queue;
- final ConcurrentMap<Future<Path>,LocalizerResourceRequestEvent> pending;
+ final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
+ // TODO hack to work around broken signaling
+ final Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts;
PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf),
Executors.newFixedThreadPool(conf.getInt(
NM_MAX_PUBLIC_FETCH_THREADS, DEFAULT_MAX_PUBLIC_FETCH_THREADS)),
- new ConcurrentHashMap<Future<Path>,LocalizerResourceRequestEvent>());
+ new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
+ new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
}
PublicLocalizer(Configuration conf, FileContext lfs,
ExecutorService threadPool,
- ConcurrentMap<Future<Path>,LocalizerResourceRequestEvent> pending) {
+ Map<Future<Path>,LocalizerResourceRequestEvent> pending,
+ Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
this.lfs = lfs;
this.conf = conf;
this.pending = pending;
+ this.attempts = attempts;
String[] publicFilecache = new String[localDirs.size()];
for (int i = 0, n = localDirs.size(); i < n; ++i) {
publicFilecache[i] =
@@ -514,12 +520,20 @@ public class ResourceLocalizationService
public void addResource(LocalizerResourceRequestEvent request) {
// TODO handle failures, cancellation, requests by other containers
- LOG.info("Downloading public rsrc:" + request.getResource().getRequest());
- pending.putIfAbsent(
- queue.submit(new FSDownload(
- lfs, null, conf, publicDirs, request.getResource().getRequest(),
- new Random())),
- request);
+ LocalResourceRequest key = request.getResource().getRequest();
+ LOG.info("Downloading public rsrc:" + key);
+ synchronized (attempts) {
+ List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
+ if (null == sigh) {
+ pending.put(queue.submit(new FSDownload(
+ lfs, null, conf, publicDirs,
+ request.getResource().getRequest(), new Random())),
+ request);
+ attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
+ } else {
+ sigh.add(request);
+ }
+ }
}
@Override
@@ -529,12 +543,21 @@ public class ResourceLocalizationService
while (!Thread.currentThread().isInterrupted()) {
try {
Future<Path> completed = queue.take();
- LocalizerResourceRequestEvent assoc = pending.get(completed);
+ LocalizerResourceRequestEvent assoc = pending.remove(completed);
try {
Path local = completed.get();
+ if (null == assoc) {
+ LOG.error("Localized unkonwn resource to " + completed);
+ // TODO delete
+ return;
+ }
+ LocalResourceRequest key = assoc.getResource().getRequest();
assoc.getResource().handle(
- new ResourceLocalizedEvent(assoc.getResource().getRequest(),
+ new ResourceLocalizedEvent(key,
local, FileUtil.getDU(new File(local.toUri()))));
+ synchronized (attempts) {
+ attempts.remove(key);
+ }
} catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause());
@@ -542,6 +565,22 @@ public class ResourceLocalizationService
new ContainerResourceFailedEvent(
assoc.getContext().getContainerId(),
assoc.getResource().getRequest(), e.getCause()));
+ synchronized (attempts) {
+ LocalResourceRequest req = assoc.getResource().getRequest();
+ List<LocalizerResourceRequestEvent> reqs = attempts.get(req);
+ if (null == reqs) {
+ LOG.error("Missing pending list for " + req);
+ return;
+ }
+ if (reqs.isEmpty()) {
+ attempts.remove(req);
+ }
+ LocalizerResourceRequestEvent request = reqs.remove(0);
+ pending.put(queue.submit(new FSDownload(
+ lfs, null, conf, publicDirs,
+ request.getResource().getRequest(), new Random())),
+ request);
+ }
} catch (CancellationException e) {
// ignore; shutting down
}