You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/20 00:35:43 UTC
svn commit: r1470076 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/
hadoop-yarn/hadoop-yarn-server/hado...
Author: vinodkv
Date: Fri Apr 19 22:35:43 2013
New Revision: 1470076
URL: http://svn.apache.org/r1470076
Log:
YARN-547. Fixed race conditions in public and private resource localization which used to cause duplicate downloads. Contributed by Omkar Vinit Joshi.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Apr 19 22:35:43 2013
@@ -269,6 +269,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-585. Fix failure in TestFairScheduler#testNotAllowSubmitApplication
caused by YARN-514. (Zhijie Shen via vinodkv)
+ YARN-547. Fixed race conditions in public and private resource localization
+ which used to cause duplicate downloads. (Omkar Vinit Joshi via vinodkv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Fri Apr 19 22:35:43 2013
@@ -18,12 +18,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Component tracking resources all of the same {@link LocalResourceVisibility}
*
@@ -41,4 +44,8 @@ interface LocalResourcesTracker
String getUser();
long nextUniqueNumber();
+
+ @VisibleForTesting
+ @Private
+ LocalizedResource getLocalizedResource(LocalResourceRequest request);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Apr 19 22:35:43 2013
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* A collection of {@link LocalizedResource}s all of same
@@ -307,4 +310,11 @@ class LocalResourcesTrackerImpl implemen
public long nextUniqueNumber() {
return uniqueNumberGenerator.incrementAndGet();
}
+
+ @VisibleForTesting
+ @Private
+ @Override
+ public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
+ return localrsrc.get(request);
+ }
}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Fri Apr 19 22:35:43 2013
@@ -78,19 +78,14 @@ public class LocalizedResource implement
// From INIT (ref == 0, awaiting req)
.addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition())
- .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
- ResourceEventType.LOCALIZED, new FetchDirectTransition())
- .addTransition(ResourceState.INIT, ResourceState.INIT,
- ResourceEventType.RELEASE, new ReleaseTransition())
// From DOWNLOADING (ref > 0, may be localizing)
.addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!!
.addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED,
ResourceEventType.LOCALIZED, new FetchSuccessTransition())
- .addTransition(ResourceState.DOWNLOADING,
- EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
- ResourceEventType.RELEASE, new ReleasePendingTransition())
+ .addTransition(ResourceState.DOWNLOADING,ResourceState.DOWNLOADING,
+ ResourceEventType.RELEASE, new ReleaseTransition())
.addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
@@ -98,8 +93,6 @@ public class LocalizedResource implement
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.REQUEST, new LocalizedResourceTransition())
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
- ResourceEventType.LOCALIZED)
- .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.RELEASE, new ReleaseTransition())
.installTopology();
@@ -230,14 +223,6 @@ public class LocalizedResource implement
}
}
- private static class FetchDirectTransition extends FetchSuccessTransition {
- @Override
- public void transition(LocalizedResource rsrc, ResourceEvent event) {
- LOG.warn("Resource " + rsrc + " localized without listening container");
- super.transition(rsrc, event);
- }
- }
-
/**
* Resource localized, notify waiting containers.
*/
@@ -304,17 +289,4 @@ public class LocalizedResource implement
rsrc.release(relEvent.getContainer());
}
}
-
- private static class ReleasePendingTransition implements
- MultipleArcTransition<LocalizedResource,ResourceEvent,ResourceState> {
- @Override
- public ResourceState transition(LocalizedResource rsrc,
- ResourceEvent event) {
- ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
- rsrc.release(relEvent.getContainer());
- return rsrc.ref.isEmpty()
- ? ResourceState.INIT
- : ResourceState.DOWNLOADING;
- }
- }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri Apr 19 22:35:43 2013
@@ -31,7 +31,6 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
@@ -47,9 +46,11 @@ import java.util.concurrent.ScheduledExe
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
@@ -112,6 +113,7 @@ import org.apache.hadoop.yarn.service.Co
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ResourceLocalizationService extends CompositeService
@@ -492,7 +494,25 @@ public class ResourceLocalizationService
+ Path.SEPARATOR + appId;
return path;
}
+
+ @VisibleForTesting
+ @Private
+ public PublicLocalizer getPublicLocalizer() {
+ return localizerTracker.publicLocalizer;
+ }
+ @VisibleForTesting
+ @Private
+ public LocalizerRunner getLocalizerRunner(String locId) {
+ return localizerTracker.privLocalizers.get(locId);
+ }
+
+ @VisibleForTesting
+ @Private
+ public Map<String, LocalizerRunner> getPrivateLocalizers() {
+ return localizerTracker.privLocalizers;
+ }
+
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@@ -606,41 +626,20 @@ public class ResourceLocalizationService
final ExecutorService threadPool;
final CompletionService<Path> queue;
final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
- // TODO hack to work around broken signaling
- final Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts;
PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf),
createLocalizerExecutor(conf),
- new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
- new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
+ new HashMap<Future<Path>,LocalizerResourceRequestEvent>());
}
PublicLocalizer(Configuration conf, FileContext lfs,
ExecutorService threadPool,
- Map<Future<Path>,LocalizerResourceRequestEvent> pending,
- Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
+ Map<Future<Path>,LocalizerResourceRequestEvent> pending) {
super("Public Localizer");
this.lfs = lfs;
this.conf = conf;
this.pending = pending;
- this.attempts = attempts;
-// List<String> localDirs = dirsHandler.getLocalDirs();
-// String[] publicFilecache = new String[localDirs.size()];
-// for (int i = 0, n = localDirs.size(); i < n; ++i) {
-// publicFilecache[i] =
-// new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
-// }
-// conf.setStrings(PUBCACHE_CTXT, publicFilecache);
-
-// this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf);
-// List<String> localDirs = dirsHandler.getLocalDirs();
-// String[] publicFilecache = new String[localDirs.size()];
-// int i = 0;
-// for (String localDir : localDirs) {
-// publicFilecache[i++] =
-// new Path(localDir, ContainerLocalizer.FILECACHE).toString();
-// }
this.threadPool = threadPool;
this.queue = new ExecutorCompletionService<Path>(threadPool);
@@ -648,36 +647,45 @@ public class ResourceLocalizationService
public void addResource(LocalizerResourceRequestEvent request) {
// TODO handle failures, cancellation, requests by other containers
- LocalResourceRequest key = request.getResource().getRequest();
+ LocalizedResource rsrc = request.getResource();
+ LocalResourceRequest key = rsrc.getRequest();
LOG.info("Downloading public rsrc:" + key);
- synchronized (attempts) {
- List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
- if (null == sigh) {
+ /*
+ * Here multiple containers may request the same resource. So we need
+ * to start downloading only when
+ * 1) ResourceState == DOWNLOADING
+ * 2) We are able to acquire non blocking semaphore lock.
+ * If not we will skip this resource as either it is getting downloaded
+ * or it FAILED / LOCALIZED.
+ */
+
+ if (rsrc.tryAcquire()) {
+ if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResource resource = request.getResource().getRequest();
try {
- Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
- "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
- ContainerLocalizer.getEstimatedSize(resource), true);
+ Path publicDirDestPath =
+ dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ + ContainerLocalizer.FILECACHE,
+ ContainerLocalizer.getEstimatedSize(resource), true);
Path hierarchicalPath =
- publicRsrc.getPathForLocalization(key, publicDirDestPath);
+ publicRsrc.getPathForLocalization(key, publicDirDestPath);
if (!hierarchicalPath.equals(publicDirDestPath)) {
publicDirDestPath = hierarchicalPath;
- DiskChecker.checkDir(
- new File(publicDirDestPath.toUri().getPath()));
+ DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
publicDirDestPath =
new Path(publicDirDestPath, Long.toString(publicRsrc
.nextUniqueNumber()));
- pending.put(queue.submit(new FSDownload(
- lfs, null, conf, publicDirDestPath, resource)),
- request);
- attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
+ pending.put(queue.submit(new FSDownload(lfs, null, conf,
+ publicDirDestPath, resource)), request);
} catch (IOException e) {
+ rsrc.unlock();
+ // TODO Need to Fix IO Exceptions - Notifying resource
LOG.error("Local path for public localization is not found. "
+ " May be disks failed.", e);
}
} else {
- sigh.add(request);
+ rsrc.unlock();
}
}
}
@@ -700,24 +708,14 @@ public class ResourceLocalizationService
LocalResourceRequest key = assoc.getResource().getRequest();
publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
.getDU(new File(local.toUri()))));
- synchronized (attempts) {
- attempts.remove(key);
- }
+ assoc.getResource().unlock();
} catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause());
LocalResourceRequest req = assoc.getResource().getRequest();
publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
.getCause()));
- synchronized (attempts) {
- List<LocalizerResourceRequestEvent> reqs;
- reqs = attempts.get(req);
- if (null == reqs) {
- LOG.error("Missing pending list for " + req);
- return;
- }
- attempts.remove(req);
- }
+ assoc.getResource().unlock();
} catch (CancellationException e) {
// ignore; shutting down
}
@@ -776,22 +774,35 @@ public class ResourceLocalizationService
i.hasNext();) {
LocalizerResourceRequestEvent evt = i.next();
LocalizedResource nRsrc = evt.getResource();
- if (ResourceState.LOCALIZED.equals(nRsrc.getState())) {
+ // Resource download should take place ONLY if resource is in
+ // Downloading state
+ if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
i.remove();
continue;
}
+ /*
+ * Multiple containers will try to download the same resource. So the
+ * resource download should start only if
+ * 1) We can acquire a non blocking semaphore lock on resource
+ * 2) Resource is still in DOWNLOADING state
+ */
if (nRsrc.tryAcquire()) {
- LocalResourceRequest nextRsrc = nRsrc.getRequest();
- LocalResource next =
- recordFactory.newRecordInstance(LocalResource.class);
- next.setResource(
- ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath()));
- next.setTimestamp(nextRsrc.getTimestamp());
- next.setType(nextRsrc.getType());
- next.setVisibility(evt.getVisibility());
- next.setPattern(evt.getPattern());
- scheduled.put(nextRsrc, evt);
- return next;
+ if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
+ LocalResourceRequest nextRsrc = nRsrc.getRequest();
+ LocalResource next =
+ recordFactory.newRecordInstance(LocalResource.class);
+ next.setResource(ConverterUtils.getYarnUrlFromPath(nextRsrc
+ .getPath()));
+ next.setTimestamp(nextRsrc.getTimestamp());
+ next.setType(nextRsrc.getType());
+ next.setVisibility(evt.getVisibility());
+ next.setPattern(evt.getPattern());
+ scheduled.put(nextRsrc, evt);
+ return next;
+ } else {
+ // Need to release acquired lock
+ nRsrc.unlock();
+ }
}
}
return null;
@@ -863,6 +874,12 @@ public class ResourceLocalizationService
new ResourceLocalizedEvent(req, ConverterUtils
.getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
} catch (URISyntaxException e) { }
+
+ // unlocking the resource and removing it from scheduled resource
+ // list
+ assoc.getResource().unlock();
+ scheduled.remove(req);
+
if (pending.isEmpty()) {
// TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
@@ -889,11 +906,16 @@ public class ResourceLocalizationService
break;
case FETCH_FAILURE:
LOG.info("DEBUG: FAILED " + req, stat.getException());
- assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE);
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(
new ResourceFailedLocalizationEvent(req, stat.getException()));
+
+ // unlocking the resource and removing it from scheduled resource
+ // list
+ assoc.getResource().unlock();
+ scheduled.remove(req);
+
break;
default:
LOG.info("Unknown status: " + stat.getStatus());
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Fri Apr 19 22:35:43 2013
@@ -129,14 +129,10 @@ public class TestLocalResourcesTrackerIm
dispatcher.await();
verifyTrackedResourceCount(tracker, 2);
- // Verify resources in state INIT with ref-count=0 is removed.
- Assert.assertTrue(tracker.remove(lr2, mockDelService));
- verifyTrackedResourceCount(tracker, 1);
-
// Verify resource with non zero ref count is not removed.
Assert.assertEquals(2, lr1.getRefCount());
Assert.assertFalse(tracker.remove(lr1, mockDelService));
- verifyTrackedResourceCount(tracker, 1);
+ verifyTrackedResourceCount(tracker, 2);
// Localize resource1
ResourceLocalizedEvent rle =
@@ -151,7 +147,7 @@ public class TestLocalResourcesTrackerIm
// Verify resources in state LOCALIZED with ref-count=0 is removed.
Assert.assertTrue(tracker.remove(lr1, mockDelService));
- verifyTrackedResourceCount(tracker, 0);
+ verifyTrackedResourceCount(tracker, 1);
} finally {
if (dispatcher != null) {
dispatcher.stop();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java Fri Apr 19 22:35:43 2013
@@ -117,7 +117,7 @@ public class TestLocalizedResource {
local.handle(new ResourceReleaseEvent(rsrcA, container1));
dispatcher.await();
verify(containerBus, never()).handle(isA(ContainerEvent.class));
- assertEquals(ResourceState.INIT, local.getState());
+ assertEquals(ResourceState.DOWNLOADING, local.getState());
// Register C2, C3
final ContainerId container2 = getMockContainer(2);
@@ -176,24 +176,6 @@ public class TestLocalizedResource {
}
}
- @Test
- public void testDirectLocalization() throws Exception {
- DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(new Configuration());
- try {
- dispatcher.start();
- LocalResource apiRsrc = createMockResource();
- LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
- LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
- Path p = new Path("file:///cache/rsrcA");
- local.handle(new ResourceLocalizedEvent(rsrcA, p, 10));
- dispatcher.await();
- assertEquals(ResourceState.LOCALIZED, local.getState());
- } finally {
- dispatcher.stop();
- }
- }
-
static LocalResource createMockResource() {
// mock rsrc location
org.apache.hadoop.yarn.api.records.URL uriA =
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1470076&r1=1470075&r2=1470076&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Fri Apr 19 22:35:43 2013
@@ -34,9 +34,9 @@ import static org.mockito.Mockito.doRetu
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -53,6 +53,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
import junit.framework.Assert;
@@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -90,20 +92,28 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalResourceStatusPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass;
@@ -677,6 +687,481 @@ public class TestResourceLocalizationSer
}
}
+ @Test(timeout = 100000)
+ @SuppressWarnings("unchecked")
+ public void testParallelDownloadAttemptsForPrivateResource() throws Exception {
+
+ DrainDispatcher dispatcher1 = null;
+ try {
+ dispatcher1 = new DrainDispatcher();
+ String user = "testuser";
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+
+ // mocked Resource Localization Service
+ Configuration conf = new Configuration();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ // We don't want files to be created
+ doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
+ anyBoolean());
+
+ // creating one local directory
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[1];
+ for (int i = 0; i < 1; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ // setting log directory.
+ String logDir =
+ lfs.makeQualified(new Path(basedir, "logdir ")).toString();
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
+ LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
+ localDirHandler.init(conf);
+ // Registering event handlers
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher1.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher1.register(ContainerEventType.class, containerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = mock(DeletionService.class);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ // initializing directory handler.
+ dirsHandler.init(conf);
+
+ dispatcher1.init(conf);
+ dispatcher1.start();
+
+ ResourceLocalizationService rls =
+ new ResourceLocalizationService(dispatcher1, exec, delService,
+ localDirHandler);
+ dispatcher1.register(LocalizationEventType.class, rls);
+ rls.init(conf);
+
+ rls.handle(createApplicationLocalizationEvent(user, appId));
+
+ LocalResourceRequest req =
+ new LocalResourceRequest(new Path("file:///tmp"), 123L,
+ LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
+
+ // We need to pre-populate the LocalizerRunner as the
+ // Resource Localization Service code internally starts them which
+ // definitely we don't want.
+
+ // creating new containers and populating corresponding localizer runners
+
+ // Container - 1
+ ContainerImpl container1 = createMockContainer(user, 1);
+ String localizerId1 = container1.getContainerID().toString();
+ rls.getPrivateLocalizers().put(
+ localizerId1,
+ rls.new LocalizerRunner(new LocalizerContext(user, container1
+ .getContainerID(), null), localizerId1));
+ LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);
+
+ dispatcher1.getEventHandler().handle(
+ createContainerLocalizationEvent(container1,
+ LocalResourceVisibility.PRIVATE, req));
+ Assert
+ .assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 1, 200));
+
+ // Container - 2 now makes the request.
+ ContainerImpl container2 = createMockContainer(user, 2);
+ String localizerId2 = container2.getContainerID().toString();
+ rls.getPrivateLocalizers().put(
+ localizerId2,
+ rls.new LocalizerRunner(new LocalizerContext(user, container2
+ .getContainerID(), null), localizerId2));
+ LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
+ dispatcher1.getEventHandler().handle(
+ createContainerLocalizationEvent(container2,
+ LocalResourceVisibility.PRIVATE, req));
+ Assert
+ .assertTrue(waitForPrivateDownloadToStart(rls, localizerId2, 1, 200));
+
+ // Retrieving localized resource.
+ LocalResourcesTracker tracker =
+ rls.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, user,
+ appId);
+ LocalizedResource lr = tracker.getLocalizedResource(req);
+ // Resource would now have moved into DOWNLOADING state
+ Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState());
+ // Resource should have one permit
+ Assert.assertEquals(1, lr.sem.availablePermits());
+
+ // Resource Localization Service receives first heart beat from
+ // ContainerLocalizer for container1
+ LocalizerHeartbeatResponse response1 =
+ rls.heartbeat(createLocalizerStatus(localizerId1));
+
+ // Resource must have been added to scheduled map
+ Assert.assertEquals(1, localizerRunner1.scheduled.size());
+ // Checking resource in the response and also available permits for it.
+ Assert.assertEquals(req.getResource(), response1.getResourceSpecs()
+ .get(0).getResource().getResource());
+ Assert.assertEquals(0, lr.sem.availablePermits());
+
+ // Resource Localization Service now receives first heart beat from
+ // ContainerLocalizer for container2
+ LocalizerHeartbeatResponse response2 =
+ rls.heartbeat(createLocalizerStatus(localizerId2));
+
+ // Resource must not have been added to scheduled map
+ Assert.assertEquals(0, localizerRunner2.scheduled.size());
+ // No resource is returned in response
+ Assert.assertEquals(0, response2.getResourceSpecs().size());
+
+ // ContainerLocalizer - 1 now sends failed resource heartbeat.
+ rls.heartbeat(createLocalizerStatusForFailedResource(localizerId1, req));
+
+ // Resource Localization should fail and state is modified accordingly.
+ // Also Local should be release on the LocalizedResource.
+ Assert
+ .assertTrue(waitForResourceState(lr, rls, req,
+ LocalResourceVisibility.PRIVATE, user, appId, ResourceState.FAILED,
+ 200));
+ Assert.assertTrue(lr.getState().equals(ResourceState.FAILED));
+ Assert.assertEquals(0, localizerRunner1.scheduled.size());
+
+ // Now Container-2 once again sends heart beat to resource localization
+ // service
+
+ // Now container-2 again try to download the resource it should still
+ // not get the resource as the resource is now not in DOWNLOADING state.
+ response2 = rls.heartbeat(createLocalizerStatus(localizerId2));
+
+ // Resource must not have been added to scheduled map.
+ // Also as the resource has failed download it will be removed from
+ // pending list.
+ Assert.assertEquals(0, localizerRunner2.scheduled.size());
+ Assert.assertEquals(0, localizerRunner2.pending.size());
+ Assert.assertEquals(0, response2.getResourceSpecs().size());
+
+ } finally {
+ if (dispatcher1 != null) {
+ dispatcher1.stop();
+ }
+ }
+ }
+
+ private LocalizerStatus createLocalizerStatusForFailedResource(
+ String localizerId, LocalResourceRequest req) {
+ LocalizerStatus status = createLocalizerStatus(localizerId);
+ LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl();
+ resourceStatus.setException(new YarnRemoteExceptionPBImpl("test"));
+ resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE);
+ resourceStatus.setResource(req);
+ status.addResourceStatus(resourceStatus);
+ return status;
+ }
+
+ private LocalizerStatus createLocalizerStatus(String localizerId1) {
+ LocalizerStatus status = new LocalizerStatusPBImpl();
+ status.setLocalizerId(localizerId1);
+ return status;
+ }
+
+ private LocalizationEvent createApplicationLocalizationEvent(String user,
+ ApplicationId appId) {
+ Application app = mock(Application.class);
+ when(app.getUser()).thenReturn(user);
+ when(app.getAppId()).thenReturn(appId);
+ return new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app);
+ }
+
+ @Test(timeout = 100000)
+ @SuppressWarnings("unchecked")
+ public void testParallelDownloadAttemptsForPublicResource() throws Exception {
+
+ DrainDispatcher dispatcher1 = null;
+ String user = "testuser";
+ try {
+ // Setting up ResourceLocalization service.
+ Configuration conf = new Configuration();
+ dispatcher1 = new DrainDispatcher();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ // We don't want files to be created
+ doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class),
+ anyBoolean());
+
+ // creating one local directory
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[1];
+ for (int i = 0; i < 1; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ // setting log directory.
+ String logDir =
+ lfs.makeQualified(new Path(basedir, "logdir ")).toString();
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
+ // Registering event handlers
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher1.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher1.register(ContainerEventType.class, containerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = mock(DeletionService.class);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ // initializing directory handler.
+ dirsHandler.init(conf);
+
+ dispatcher1.init(conf);
+ dispatcher1.start();
+
+ // Creating and initializing ResourceLocalizationService but not starting
+ // it as otherwise it will remove requests from pending queue.
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher1, exec, delService,
+ dirsHandler);
+ ResourceLocalizationService spyService = spy(rawService);
+ dispatcher1.register(LocalizationEventType.class, spyService);
+ spyService.init(conf);
+
+ // Initially pending map should be empty for public localizer
+ Assert.assertEquals(0, spyService.getPublicLocalizer().pending.size());
+
+ LocalResourceRequest req =
+ new LocalResourceRequest(new Path("/tmp"), 123L,
+ LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, "");
+
+ // Initializing application
+ ApplicationImpl app = mock(ApplicationImpl.class);
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ when(app.getAppId()).thenReturn(appId);
+ when(app.getUser()).thenReturn(user);
+ dispatcher1.getEventHandler().handle(
+ new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+
+ // Container - 1
+
+ // container requesting the resource
+ ContainerImpl container1 = createMockContainer(user, 1);
+ dispatcher1.getEventHandler().handle(
+ createContainerLocalizationEvent(container1,
+ LocalResourceVisibility.PUBLIC, req));
+
+ // Waiting for resource to change into DOWNLOADING state.
+ Assert.assertTrue(waitForResourceState(null, spyService, req,
+ LocalResourceVisibility.PUBLIC, user, null, ResourceState.DOWNLOADING,
+ 200));
+
+ // Waiting for download to start.
+ Assert.assertTrue(waitForPublicDownloadToStart(spyService, 1, 200));
+
+ LocalizedResource lr =
+ getLocalizedResource(spyService, req, LocalResourceVisibility.PUBLIC,
+ user, null);
+ // Resource would now have moved into DOWNLOADING state
+ Assert.assertEquals(ResourceState.DOWNLOADING, lr.getState());
+
+ // pending should have this resource now.
+ Assert.assertEquals(1, spyService.getPublicLocalizer().pending.size());
+ // Now resource should have 0 permit.
+ Assert.assertEquals(0, lr.sem.availablePermits());
+
+ // Container - 2
+
+ // Container requesting the same resource.
+ ContainerImpl container2 = createMockContainer(user, 2);
+ dispatcher1.getEventHandler().handle(
+ createContainerLocalizationEvent(container2,
+ LocalResourceVisibility.PUBLIC, req));
+
+ // Waiting for download to start. This should return false as new download
+ // will not start
+ Assert.assertFalse(waitForPublicDownloadToStart(spyService, 2, 100));
+
+ // Now Failing the resource download. As a part of it
+ // resource state is changed and then lock is released.
+ ResourceFailedLocalizationEvent locFailedEvent =
+ new ResourceFailedLocalizationEvent(req, new Exception("test"));
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user,
+ null).handle(locFailedEvent);
+
+ // Waiting for resource to change into FAILED state.
+ Assert.assertTrue(waitForResourceState(lr, spyService, req,
+ LocalResourceVisibility.PUBLIC, user, null, ResourceState.FAILED, 200));
+ // releasing lock as a part of download failed process.
+ lr.unlock();
+ // removing pending download request.
+ spyService.getPublicLocalizer().pending.clear();
+
+ // Now I need to simulate a race condition wherein Event is added to
+ // dispatcher before resource state changes to either FAILED or LOCALIZED
+ // Hence sending event directly to dispatcher.
+ LocalizerResourceRequestEvent localizerEvent =
+ new LocalizerResourceRequestEvent(lr, null,
+ mock(LocalizerContext.class), null);
+
+ dispatcher1.getEventHandler().handle(localizerEvent);
+ // Waiting for download to start. This should return false as new download
+ // will not start
+ Assert.assertFalse(waitForPublicDownloadToStart(spyService, 1, 100));
+ // Checking available permits now.
+ Assert.assertEquals(1, lr.sem.availablePermits());
+
+ } finally {
+ if (dispatcher1 != null) {
+ dispatcher1.stop();
+ }
+ }
+
+ }
+
+ private boolean waitForPrivateDownloadToStart(
+ ResourceLocalizationService service, String localizerId, int size,
+ int maxWaitTime) {
+ List<LocalizerResourceRequestEvent> pending = null;
+ // Waiting for localizer to be created.
+ do {
+ if (service.getPrivateLocalizers().get(localizerId) != null) {
+ pending = service.getPrivateLocalizers().get(localizerId).pending;
+ }
+ if (pending == null) {
+ try {
+ maxWaitTime -= 20;
+ Thread.sleep(20);
+ } catch (Exception e) {
+ }
+ } else {
+ break;
+ }
+ } while (maxWaitTime > 0);
+ if (pending == null) {
+ return false;
+ }
+ do {
+ if (pending.size() == size) {
+ return true;
+ } else {
+ try {
+ maxWaitTime -= 20;
+ Thread.sleep(20);
+ } catch (Exception e) {
+ }
+ }
+ } while (maxWaitTime > 0);
+ return pending.size() == size;
+ }
+
+ private boolean waitForPublicDownloadToStart(
+ ResourceLocalizationService service, int size, int maxWaitTime) {
+ Map<Future<Path>, LocalizerResourceRequestEvent> pending = null;
+ // Waiting for localizer to be created.
+ do {
+ if (service.getPublicLocalizer() != null) {
+ pending = service.getPublicLocalizer().pending;
+ }
+ if (pending == null) {
+ try {
+ maxWaitTime -= 20;
+ Thread.sleep(20);
+ } catch (Exception e) {
+ }
+ } else {
+ break;
+ }
+ } while (maxWaitTime > 0);
+ if (pending == null) {
+ return false;
+ }
+ do {
+ if (pending.size() == size) {
+ return true;
+ } else {
+ try {
+ maxWaitTime -= 20;
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ }
+ }
+ } while (maxWaitTime > 0);
+ return pending.size() == size;
+
+ }
+
+ private LocalizedResource getLocalizedResource(
+ ResourceLocalizationService service, LocalResourceRequest req,
+ LocalResourceVisibility vis, String user, ApplicationId appId) {
+ return service.getLocalResourcesTracker(vis, user, appId)
+ .getLocalizedResource(req);
+ }
+
+ private boolean waitForResourceState(LocalizedResource lr,
+ ResourceLocalizationService service, LocalResourceRequest req,
+ LocalResourceVisibility vis, String user, ApplicationId appId,
+ ResourceState resourceState, long maxWaitTime) {
+ LocalResourcesTracker tracker = null;
+ // checking tracker is created
+ do {
+ if (tracker == null) {
+ tracker = service.getLocalResourcesTracker(vis, user, appId);
+ }
+ if (tracker != null && lr == null) {
+ lr = tracker.getLocalizedResource(req);
+ }
+ if (lr != null) {
+ break;
+ } else {
+ try {
+ maxWaitTime -= 20;
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ }
+ }
+ } while (maxWaitTime > 0);
+ // this will wait till resource state is changed to (resourceState).
+ if (lr == null) {
+ return false;
+ }
+ do {
+ if (!lr.getState().equals(resourceState)) {
+ try {
+ maxWaitTime -= 50;
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ }
+ } else {
+ break;
+ }
+ } while (maxWaitTime > 0);
+ return lr.getState().equals(resourceState);
+ }
+
+ private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
+ ContainerImpl container, LocalResourceVisibility vis,
+ LocalResourceRequest req) {
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
+ new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+ List<LocalResourceRequest> resourceList =
+ new ArrayList<LocalResourceRequest>();
+ resourceList.add(req);
+ reqs.put(vis, resourceList);
+ return new ContainerLocalizationRequestEvent(container, reqs);
+ }
+
+ private ContainerImpl createMockContainer(String user, int containerId) {
+ ContainerImpl container = mock(ContainerImpl.class);
+ when(container.getContainerID()).thenReturn(
+ BuilderUtils.newContainerId(1, 1, 1, containerId));
+ when(container.getUser()).thenReturn(user);
+ Credentials mockCredentials = mock(Credentials.class);
+ when(container.getCredentials()).thenReturn(mockCredentials);
+ return container;
+ }
+
private static URL getPath(String path) {
URL url = BuilderUtils.newURL("file", null, 0, path);
return url;