You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bo...@apache.org on 2013/02/27 16:33:22 UTC
svn commit: r1450811 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/
hadoop-yarn/hadoop-yarn...
Author: bobby
Date: Wed Feb 27 15:33:22 2013
New Revision: 1450811
URL: http://svn.apache.org/r1450811
Log:
svn merge -c 1450807 FIXES: YARN-426. Failure to download a public resource prevents further downloads (Jason Lowe via bobby)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1450811&r1=1450810&r2=1450811&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Feb 27 15:33:22 2013
@@ -325,6 +325,9 @@ Release 0.23.7 - UNRELEASED
YARN-400. RM can return null application resource usage report leading to
NPE in client (Jason Lowe via tgraves)
+ YARN-426. Failure to download a public resource prevents further downloads
+ (Jason Lowe via bobby)
+
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1450811&r1=1450810&r2=1450811&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Feb 27 15:33:22 2013
@@ -659,25 +659,23 @@ public class ResourceLocalizationService
new ContainerResourceFailedEvent(
assoc.getContext().getContainerId(),
assoc.getResource().getRequest(), e.getCause()));
+ List<LocalizerResourceRequestEvent> reqs;
synchronized (attempts) {
LocalResourceRequest req = assoc.getResource().getRequest();
- List<LocalizerResourceRequestEvent> reqs = attempts.get(req);
+ reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
return;
}
- if (reqs.isEmpty()) {
- attempts.remove(req);
- }
- /*
- * Do not retry for now. Once failed is failed!
- * LocalizerResourceRequestEvent request = reqs.remove(0);
-
- pending.put(queue.submit(new FSDownload(
- lfs, null, conf, publicDirs,
- request.getResource().getRequest(), new Random())),
- request);
- */ }
+ attempts.remove(req);
+ }
+ // let the other containers know about the localization failure
+ for (LocalizerResourceRequestEvent reqEvent : reqs) {
+ dispatcher.getEventHandler().handle(
+ new ContainerResourceFailedEvent(
+ reqEvent.getContext().getContainerId(),
+ reqEvent.getResource().getRequest(), e.getCause()));
+ }
} catch (CancellationException e) {
// ignore; shutting down
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1450811&r1=1450810&r2=1450811&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Wed Feb 27 15:33:22 2013
@@ -27,13 +27,16 @@ import static org.mockito.Matchers.argTh
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
@@ -46,6 +49,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
import junit.framework.Assert;
@@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
@@ -102,6 +108,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestResourceLocalizationService {
@@ -512,6 +520,111 @@ public class TestResourceLocalizationSer
}
}
+ @Test(timeout=20000)
+ @SuppressWarnings("unchecked") // mocked generics
+ public void testFailedPublicResource() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doNothing().when(spylfs).mkdir(
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[4];
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = mock(DeletionService.class);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ dirsHandler.init(conf);
+
+ dispatcher.init(conf);
+ dispatcher.start();
+
+ try {
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher, exec, delService,
+ dirsHandler);
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(mockServer).when(spyService).createServer();
+ doReturn(lfs).when(spyService).getLocalFileContext(
+ isA(Configuration.class));
+
+ spyService.init(conf);
+ spyService.start();
+
+ final String user = "user0";
+ // init application
+ final Application app = mock(Application.class);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
+ when(app.getUser()).thenReturn(user);
+ when(app.getAppId()).thenReturn(appId);
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+ dispatcher.await();
+
+ // init container.
+ final Container c = getMockContainer(appId, 42);
+
+ // init resources
+ Random r = new Random();
+ long seed = r.nextLong();
+ System.out.println("SEED: " + seed);
+ r.setSeed(seed);
+
+ // cause chmod to fail after a delay
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) throws IOException {
+ try {
+ barrier.await();
+ } catch (InterruptedException e) {
+ } catch (BrokenBarrierException e) {
+ }
+ throw new IOException("forced failure");
+ }
+ }).when(spylfs)
+ .setPermission(isA(Path.class), isA(FsPermission.class));
+
+ // Queue up two localization requests for the same public resource
+ final LocalResource pubResource = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ req.put(LocalResourceVisibility.PUBLIC,
+ Collections.singletonList(pubReq));
+
+ Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
+ pubRsrcs.add(pubReq);
+
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+ dispatcher.await();
+
+ // allow the chmod to fail now that both requests have been queued
+ barrier.await();
+ verify(containerBus, timeout(5000).times(2))
+ .handle(isA(ContainerResourceFailedEvent.class));
+ } finally {
+ dispatcher.stop();
+ }
+ }
+
private static URL getPath(String path) {
URL url = BuilderUtils.newURL("file", null, 0, path);
return url;