You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/02/11 20:48:58 UTC
[40/50] [abbrv] hadoop git commit: YARN-3074. Nodemanager dies when
localizer runner tries to write to a full disk. Contributed by Varun Saxena
YARN-3074. Nodemanager dies when localizer runner tries to write to a full disk. Contributed by Varun Saxena
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b379972a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b379972a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b379972a
Branch: refs/heads/YARN-2928
Commit: b379972ab39551d4b57436a54c0098a63742c7e1
Parents: b94c111
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Feb 11 16:33:43 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Feb 11 16:33:43 2015 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../localizer/ResourceLocalizationService.java | 19 +++--
.../TestResourceLocalizationService.java | 82 ++++++++++++++++++++
3 files changed, 98 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a1c3407..ba5490c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -546,6 +546,9 @@ Release 2.7.0 - UNRELEASED
YARN-3160. Fix non-atomic operation on nodeUpdateQueue in RMNodeImpl.
(Chengbing Liu via junping_du)
+ YARN-3074. Nodemanager dies when localizer runner tries to write to a full
+ disk (Varun Saxena via jlowe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 8c84132..dd50ead 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
@@ -1063,6 +1064,7 @@ public class ResourceLocalizationService extends CompositeService
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
Path nmPrivateCTokensPath = null;
+ Throwable exception = null;
try {
// Get nmPrivateDir
nmPrivateCTokensPath =
@@ -1090,14 +1092,19 @@ public class ResourceLocalizationService extends CompositeService
+ dirsHandler.getDisksHealthReport(false));
}
// TODO handle ExitCodeException separately?
+ } catch (FSError fe) {
+ exception = fe;
} catch (Exception e) {
- LOG.info("Localizer failed", e);
- // 3) on error, report failure to Container and signal ABORT
- // 3.1) notify resource of failed localization
- ContainerId cId = context.getContainerId();
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(cId, null, e.getMessage()));
+ exception = e;
} finally {
+ if (exception != null) {
+ LOG.info("Localizer failed", exception);
+ // On error, report failure to Container and signal ABORT
+ // Notify resource of failed localization
+ ContainerId cId = context.getContainerId();
+ dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent(
+ cId, null, exception.getMessage()));
+ }
for (LocalizerResourceRequestEvent event : scheduled.values()) {
event.getResource().unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b379972a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 30af5a4..d3c3521 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -69,6 +70,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
@@ -715,6 +717,86 @@ public class TestResourceLocalizationService {
stateStore.close();
}
}
+
+
+ @Test( timeout = 10000)
+ @SuppressWarnings("unchecked") // mocked generics
+ public void testLocalizerRunnerException() throws Exception {
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler);
+ dirsHandlerSpy.init(conf);
+
+ DeletionService delServiceReal = new DeletionService(exec);
+ DeletionService delService = spy(delServiceReal);
+ delService.init(new Configuration());
+ delService.start();
+
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher, exec, delService,
+ dirsHandlerSpy, nmContext);
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(mockServer).when(spyService).createServer();
+ try {
+ spyService.init(conf);
+ spyService.start();
+
+ // init application
+ final Application app = mock(Application.class);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
+ when(app.getUser()).thenReturn("user0");
+ when(app.getAppId()).thenReturn(appId);
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+ dispatcher.await();
+
+ Random r = new Random();
+ long seed = r.nextLong();
+ System.out.println("SEED: " + seed);
+ r.setSeed(seed);
+ final Container c = getMockContainer(appId, 42, "user0");
+ final LocalResource resource1 = getPrivateMockedResource(r);
+ System.out.println("Here 4");
+
+ final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ List<LocalResourceRequest> privateResourceList =
+ new ArrayList<LocalResourceRequest>();
+ privateResourceList.add(req1);
+ rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+
+ final Constructor<?>[] constructors =
+ FSError.class.getDeclaredConstructors();
+ constructors[0].setAccessible(true);
+ FSError fsError =
+ (FSError) constructors[0].newInstance(new IOException("Disk Error"));
+
+ Mockito
+ .doThrow(fsError)
+ .when(dirsHandlerSpy)
+ .getLocalPathForWrite(isA(String.class));
+ spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
+ Thread.sleep(1000);
+ dispatcher.await();
+ // Verify if ContainerResourceFailedEvent is invoked on FSError
+ verify(containerBus).handle(isA(ContainerResourceFailedEvent.class));
+ } finally {
+ spyService.stop();
+ dispatcher.stop();
+ delService.stop();
+ }
+ }
@Test( timeout = 10000)
@SuppressWarnings("unchecked") // mocked generics