You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2014/10/21 19:32:18 UTC
[1/2] YARN-90. NodeManager should identify failed disks becoming good
again. Contributed by Varun Vasudev
Repository: hadoop
Updated Branches:
refs/heads/trunk b6f9d5538 -> 6f2028bd1
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index ab86a18..8a5441a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -58,6 +59,8 @@ import org.junit.Assert;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -105,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -136,12 +140,19 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
super();
this.remoteRootLogDir.mkdir();
}
+
+ DrainDispatcher dispatcher;
+ EventHandler<ApplicationEvent> appEventHandler;
@Override
+ @SuppressWarnings("unchecked")
public void setup() throws IOException {
super.setup();
NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
((NMContext)context).setNodeId(nodeId);
+ dispatcher = createDispatcher();
+ appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
}
@Override
@@ -149,10 +160,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
super.tearDown();
createContainerExecutor().deleteAsUser(user,
new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {});
+ dispatcher.await();
+ dispatcher.stop();
+ dispatcher.close();
}
@Test
- @SuppressWarnings("unchecked")
public void testLocalFileDeletionAfterUpload() throws Exception {
this.delSrvc = new DeletionService(createContainerExecutor());
delSrvc = spy(delSrvc);
@@ -161,10 +174,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
- DrainDispatcher dispatcher = createDispatcher();
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
@@ -236,16 +245,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test
- @SuppressWarnings("unchecked")
public void testNoContainerOnNode() throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
- DrainDispatcher dispatcher = createDispatcher();
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler);
@@ -285,6 +289,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
};
checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
dispatcher.stop();
+ logAggregationService.close();
}
@Test
@@ -294,6 +299,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
+
String[] fileNames = new String[] { "stdout", "stderr", "syslog" };
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
@@ -432,17 +438,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test
- @SuppressWarnings("unchecked")
public void testVerifyAndCreateRemoteDirsFailure()
throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
- DrainDispatcher dispatcher = createDispatcher();
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
@@ -456,8 +457,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.start();
// Now try to start an application
- ApplicationId appId = BuilderUtils.newApplicationId(
- System.currentTimeMillis(), (int)Math.random());
+ ApplicationId appId =
+ BuilderUtils.newApplicationId(System.currentTimeMillis(),
+ (int) (Math.random() * 1000));
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
@@ -475,8 +477,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Mockito.reset(logAggregationService);
// Now try to start another one
- ApplicationId appId2 = BuilderUtils.newApplicationId(
- System.currentTimeMillis(), (int)Math.random());
+ ApplicationId appId2 =
+ BuilderUtils.newApplicationId(System.currentTimeMillis(),
+ (int) (Math.random() * 1000));
File appLogDir =
new File(localLogDir, ConverterUtils.toString(appId2));
appLogDir.mkdir();
@@ -578,6 +581,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
+ aggSvc.stop();
+ aggSvc.close();
}
@Test
@@ -588,19 +593,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
-
- DrainDispatcher dispatcher = createDispatcher();
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
+
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
logAggregationService.init(this.conf);
logAggregationService.start();
- ApplicationId appId = BuilderUtils.newApplicationId(
- System.currentTimeMillis(), (int)Math.random());
+ ApplicationId appId =
+ BuilderUtils.newApplicationId(System.currentTimeMillis(),
+ (int) (Math.random() * 1000));
doThrow(new YarnRuntimeException("KABOOM!"))
.when(logAggregationService).initAppAggregator(
eq(appId), eq(user), any(Credentials.class),
@@ -634,26 +636,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test
- @SuppressWarnings("unchecked")
public void testLogAggregationCreateDirsFailsWithoutKillingNM()
throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
-
- DrainDispatcher dispatcher = createDispatcher();
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
+
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
logAggregationService.init(this.conf);
logAggregationService.start();
- ApplicationId appId = BuilderUtils.newApplicationId(
- System.currentTimeMillis(), (int)Math.random());
+ ApplicationId appId =
+ BuilderUtils.newApplicationId(System.currentTimeMillis(),
+ (int) (Math.random() * 1000));
Exception e = new RuntimeException("KABOOM!");
doThrow(e)
.when(logAggregationService).createAppDir(any(String.class),
@@ -905,18 +903,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test(timeout=20000)
- @SuppressWarnings("unchecked")
public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);
// get the AppLogAggregationImpl thread to crash
LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
-
- DrainDispatcher dispatcher = createDispatcher();
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
+
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc,
mockedDirSvc);
@@ -930,20 +923,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
+ logAggregationService.close();
}
@Test
- @SuppressWarnings("unchecked")
public void testLogAggregatorCleanup() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);
// get the AppLogAggregationImpl thread to crash
LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
- DrainDispatcher dispatcher = createDispatcher();
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc,
mockedDirSvc);
@@ -964,6 +953,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
Assert.assertEquals("Log aggregator failed to cleanup!", 0,
logAggregationService.getNumAggregators());
+ logAggregationService.stop();
+ logAggregationService.close();
}
@SuppressWarnings("unchecked")
@@ -1039,6 +1030,72 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
return sb.toString();
}
+ /*
+ * Test to make sure we handle cases where the directories we get back from
+ * the LocalDirsHandler may have issues including the log dir not being
+ * present as well as other issues. The test uses helper functions from
+ * TestNonAggregatingLogHandler.
+ */
+ @Test
+ public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception {
+
+ // setup conf and services
+ DeletionService mockDelService = mock(DeletionService.class);
+ File[] localLogDirs =
+ TestNonAggregatingLogHandler.getLocalLogDirFiles(this.getClass()
+ .getName(), 7);
+ final List<String> localLogDirPaths =
+ new ArrayList<String>(localLogDirs.length);
+ for (int i = 0; i < localLogDirs.length; i++) {
+ localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+ }
+
+ String localLogDirsString = StringUtils.join(localLogDirPaths, ",");
+
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+ this.conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);
+
+ ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(application1, 1);
+
+ this.dirsHandler = new LocalDirsHandlerService();
+ LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
+
+ LogAggregationService logAggregationService =
+ spy(new LogAggregationService(dispatcher, this.context, mockDelService,
+ mockDirsHandler));
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doReturn(lfs).when(logAggregationService).getLocalFileContext(
+ isA(Configuration.class));
+
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ TestNonAggregatingLogHandler.runMockedFailedDirs(logAggregationService,
+ application1, user, mockDelService, mockDirsHandler, conf, spylfs, lfs,
+ localLogDirs);
+
+ logAggregationService.stop();
+ assertEquals(0, logAggregationService.getNumAggregators());
+ verify(logAggregationService).closeFileSystems(
+ any(UserGroupInformation.class));
+
+ ApplicationEvent expectedEvents[] =
+ new ApplicationEvent[] {
+ new ApplicationEvent(appAttemptId.getApplicationId(),
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+ new ApplicationEvent(appAttemptId.getApplicationId(),
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) };
+
+ checkEvents(appEventHandler, expectedEvents, true, "getType",
+ "getApplicationID");
+ }
+
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testLogAggregationServiceWithPatterns() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
index 300ca28..d0f6472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -19,15 +19,36 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -45,25 +66,52 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
import org.mockito.exceptions.verification.WantedButNotInvoked;
+import org.mockito.internal.matchers.VarargMatcher;
public class TestNonAggregatingLogHandler {
+
+ DeletionService mockDelService;
+ Configuration conf;
+ DrainDispatcher dispatcher;
+ EventHandler<ApplicationEvent> appEventHandler;
+ String user = "testuser";
+ ApplicationId appId;
+ ApplicationAttemptId appAttemptId;
+ ContainerId container11;
+ LocalDirsHandlerService dirsHandler;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setup() {
+ mockDelService = mock(DeletionService.class);
+ conf = new YarnConfiguration();
+ dispatcher = createDispatcher(conf);
+ appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+ appId = BuilderUtils.newApplicationId(1234, 1);
+ appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
+ container11 = BuilderUtils.newContainerId(appAttemptId, 1);
+ dirsHandler = new LocalDirsHandlerService();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ dirsHandler.stop();
+ dirsHandler.close();
+ dispatcher.await();
+ dispatcher.stop();
+ dispatcher.close();
+ }
@Test
- @SuppressWarnings("unchecked")
- public void testLogDeletion() {
- DeletionService delService = mock(DeletionService.class);
- Configuration conf = new YarnConfiguration();
- String user = "testuser";
-
- File[] localLogDirs = new File[2];
- localLogDirs[0] =
- new File("target", this.getClass().getName() + "-localLogDir0")
- .getAbsoluteFile();
- localLogDirs[1] =
- new File("target", this.getClass().getName() + "-localLogDir1")
- .getAbsoluteFile();
+ public void testLogDeletion() throws IOException {
+ File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
String localLogDirsString =
localLogDirs[0].getAbsolutePath() + ","
+ localLogDirs[1].getAbsolutePath();
@@ -72,72 +120,50 @@ public class TestNonAggregatingLogHandler {
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
- DrainDispatcher dispatcher = createDispatcher(conf);
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
- LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
- ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
- ApplicationAttemptId appAttemptId1 =
- BuilderUtils.newApplicationAttemptId(appId1, 1);
- ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
+ NonAggregatingLogHandler rawLogHandler =
+ new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler);
+ NonAggregatingLogHandler logHandler = spy(rawLogHandler);
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doReturn(lfs).when(logHandler)
+ .getLocalFileContext(isA(Configuration.class));
+ FsPermission defaultPermission =
+ FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+ final FileStatus fs =
+ new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ defaultPermission, "", "",
+ new Path(localLogDirs[0].getAbsolutePath()));
+ doReturn(fs).when(spylfs).getFileStatus(isA(Path.class));
- NonAggregatingLogHandler logHandler =
- new NonAggregatingLogHandler(dispatcher, delService, dirsHandler);
logHandler.init(conf);
logHandler.start();
- logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
+ logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
- logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
+ logHandler.handle(new LogHandlerAppFinishedEvent(appId));
Path[] localAppLogDirs = new Path[2];
localAppLogDirs[0] =
- new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
+ new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
localAppLogDirs[1] =
- new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
+ new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
- // 5 seconds for the delete which is a separate thread.
- long verifyStartTime = System.currentTimeMillis();
- WantedButNotInvoked notInvokedException = null;
- boolean matched = false;
- while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) {
- try {
- verify(delService).delete(eq(user), (Path) eq(null),
- eq(localAppLogDirs[0]), eq(localAppLogDirs[1]));
- matched = true;
- } catch (WantedButNotInvoked e) {
- notInvokedException = e;
- try {
- Thread.sleep(50l);
- } catch (InterruptedException i) {
- }
- }
- }
- if (!matched) {
- throw notInvokedException;
+ testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs);
+ logHandler.close();
+ for (int i = 0; i < localLogDirs.length; i++) {
+ FileUtils.deleteDirectory(localLogDirs[i]);
}
}
@Test
- @SuppressWarnings("unchecked")
- public void testDelayedDelete() {
- DeletionService delService = mock(DeletionService.class);
- Configuration conf = new YarnConfiguration();
- String user = "testuser";
-
- File[] localLogDirs = new File[2];
- localLogDirs[0] =
- new File("target", this.getClass().getName() + "-localLogDir0")
- .getAbsoluteFile();
- localLogDirs[1] =
- new File("target", this.getClass().getName() + "-localLogDir1")
- .getAbsoluteFile();
+ public void testDelayedDelete() throws IOException {
+ File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
String localLogDirsString =
localLogDirs[0].getAbsolutePath() + ","
+ localLogDirs[1].getAbsolutePath();
@@ -148,42 +174,36 @@ public class TestNonAggregatingLogHandler {
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
- DrainDispatcher dispatcher = createDispatcher(conf);
- EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
- dispatcher.register(ApplicationEventType.class, appEventHandler);
-
- LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
- ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
- ApplicationAttemptId appAttemptId1 =
- BuilderUtils.newApplicationAttemptId(appId1, 1);
- ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
-
NonAggregatingLogHandler logHandler =
- new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService,
+ new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService,
dirsHandler);
logHandler.init(conf);
logHandler.start();
- logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
+ logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
- logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
+ logHandler.handle(new LogHandlerAppFinishedEvent(appId));
Path[] localAppLogDirs = new Path[2];
localAppLogDirs[0] =
- new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
+ new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
localAppLogDirs[1] =
- new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
+ new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
ScheduledThreadPoolExecutor mockSched =
((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched;
verify(mockSched).schedule(any(Runnable.class), eq(10800l),
eq(TimeUnit.SECONDS));
+ logHandler.close();
+ for (int i = 0; i < localLogDirs.length; i++) {
+ FileUtils.deleteDirectory(localLogDirs[i]);
+ }
}
@Test
@@ -202,25 +222,25 @@ public class TestNonAggregatingLogHandler {
verify(logHandler.mockSched)
.awaitTermination(eq(10l), eq(TimeUnit.SECONDS));
verify(logHandler.mockSched).shutdownNow();
+ logHandler.close();
+ aggregatingLogHandler.close();
}
@Test
- public void testHandlingApplicationFinishedEvent() {
- Configuration conf = new Configuration();
- LocalDirsHandlerService dirsService = new LocalDirsHandlerService();
+ public void testHandlingApplicationFinishedEvent() throws IOException {
DeletionService delService = new DeletionService(null);
NonAggregatingLogHandler aggregatingLogHandler =
new NonAggregatingLogHandler(new InlineDispatcher(),
delService,
- dirsService);
+ dirsHandler);
- dirsService.init(conf);
- dirsService.start();
+ dirsHandler.init(conf);
+ dirsHandler.start();
delService.init(conf);
delService.start();
aggregatingLogHandler.init(conf);
aggregatingLogHandler.start();
- ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
+
// It should NOT throw RejectedExecutionException
aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
aggregatingLogHandler.stop();
@@ -228,6 +248,7 @@ public class TestNonAggregatingLogHandler {
// It should NOT throw RejectedExecutionException after stopping
// handler service.
aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
+ aggregatingLogHandler.close();
}
private class NonAggregatingLogHandlerWithMockExecutor extends
@@ -255,4 +276,201 @@ public class TestNonAggregatingLogHandler {
dispatcher.start();
return dispatcher;
}
+
+ /*
+ * Test to ensure that we handle the cleanup of directories that may not have
+ * the application log dirs we're trying to delete or may have other problems.
+ * Test creates 7 log dirs, and fails the directory check for 4 of them and
+ * then checks to ensure we tried to delete only the ones that passed the
+ * check.
+ */
+ @Test
+ public void testFailedDirLogDeletion() throws Exception {
+
+ File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 7);
+ final List<String> localLogDirPaths =
+ new ArrayList<String>(localLogDirs.length);
+ for (int i = 0; i < localLogDirs.length; i++) {
+ localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+ }
+
+ String localLogDirsString = StringUtils.join(localLogDirPaths, ",");
+
+ conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
+
+ LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
+
+ NonAggregatingLogHandler rawLogHandler =
+ new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler);
+ NonAggregatingLogHandler logHandler = spy(rawLogHandler);
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doReturn(lfs).when(logHandler)
+ .getLocalFileContext(isA(Configuration.class));
+ logHandler.init(conf);
+ logHandler.start();
+ runMockedFailedDirs(logHandler, appId, user, mockDelService,
+ mockDirsHandler, conf, spylfs, lfs, localLogDirs);
+ logHandler.close();
+ }
+
+ /**
+ * Function to run a log handler with directories failing the getFileStatus
+ * call. The function accepts the log handler, setup the mocks to fail with
+ * specific exceptions and ensures the deletion service has the correct calls.
+ *
+ * @param logHandler the logHandler implementation to test
+ *
+ * @param appId the application id that we wish when sending events to the log
+ * handler
+ *
+ * @param user the user name to use
+ *
+ * @param mockDelService a mock of the DeletionService which we will verify
+ * the delete calls against
+ *
+ * @param dirsHandler a spy or mock on the LocalDirsHandler service used to
+ * when creating the logHandler. It needs to be a spy so that we can intercept
+ * the getAllLogDirs() call.
+ *
+ * @param conf the configuration used
+ *
+ * @param spylfs a spy on the AbstractFileSystem object used when creating lfs
+ *
+ * @param lfs the FileContext object to be used to mock the getFileStatus()
+ * calls
+ *
+ * @param localLogDirs list of the log dirs to run the test against, must have
+ * at least 7 entries
+ */
+ public static void runMockedFailedDirs(LogHandler logHandler,
+ ApplicationId appId, String user, DeletionService mockDelService,
+ LocalDirsHandlerService dirsHandler, Configuration conf,
+ AbstractFileSystem spylfs, FileContext lfs, File[] localLogDirs)
+ throws Exception {
+ Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+ if (localLogDirs.length < 7) {
+ throw new IllegalArgumentException(
+ "Argument localLogDirs must be at least of length 7");
+ }
+ Path[] localAppLogDirPaths = new Path[localLogDirs.length];
+ for (int i = 0; i < localAppLogDirPaths.length; i++) {
+ localAppLogDirPaths[i] =
+ new Path(localLogDirs[i].getAbsolutePath(), appId.toString());
+ }
+ final List<String> localLogDirPaths =
+ new ArrayList<String>(localLogDirs.length);
+ for (int i = 0; i < localLogDirs.length; i++) {
+ localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
+ }
+
+ // setup mocks
+ FsPermission defaultPermission =
+ FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+ final FileStatus fs =
+ new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ defaultPermission, "", "",
+ new Path(localLogDirs[0].getAbsolutePath()));
+ doReturn(fs).when(spylfs).getFileStatus(isA(Path.class));
+ doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup();
+
+ logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls));
+
+ // test case where some dirs have the log dir to delete
+ // mock some dirs throwing various exceptions
+ // verify deletion happens only on the others
+ Mockito.doThrow(new FileNotFoundException()).when(spylfs)
+ .getFileStatus(eq(localAppLogDirPaths[0]));
+ doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[1]));
+ Mockito.doThrow(new AccessControlException()).when(spylfs)
+ .getFileStatus(eq(localAppLogDirPaths[2]));
+ doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[3]));
+ Mockito.doThrow(new IOException()).when(spylfs)
+ .getFileStatus(eq(localAppLogDirPaths[4]));
+ Mockito.doThrow(new UnsupportedFileSystemException("test")).when(spylfs)
+ .getFileStatus(eq(localAppLogDirPaths[5]));
+ doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[6]));
+
+ logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+
+ testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirPaths[1],
+ localAppLogDirPaths[3], localAppLogDirPaths[6]);
+
+ return;
+ }
+
+ static class DeletePathsMatcher extends ArgumentMatcher<Path[]> implements
+ VarargMatcher {
+
+ // to get rid of serialization warning
+ static final long serialVersionUID = 0;
+
+ private transient Path[] matchPaths;
+
+ DeletePathsMatcher(Path... matchPaths) {
+ this.matchPaths = matchPaths;
+ }
+
+ @Override
+ public boolean matches(Object varargs) {
+ return new EqualsBuilder().append(matchPaths, varargs).isEquals();
+ }
+
+ // function to get rid of FindBugs warning
+ private void readObject(ObjectInputStream os) throws NotSerializableException {
+ throw new NotSerializableException(this.getClass().getName());
+ }
+ }
+
+ /**
+ * Function to verify that the DeletionService object received the right
+ * requests.
+ *
+ * @param delService the DeletionService mock which we verify against
+ *
+ * @param user the user name to use when verifying the deletion
+ *
+ * @param timeout amount in milliseconds to wait before we decide the calls
+ * didn't come through
+ *
+ * @param matchPaths the paths to match in the delete calls
+ *
+ * @throws WantedButNotInvoked if the calls could not be verified
+ */
+ static void testDeletionServiceCall(DeletionService delService, String user,
+ long timeout, Path... matchPaths) {
+
+ long verifyStartTime = System.currentTimeMillis();
+ WantedButNotInvoked notInvokedException = null;
+ boolean matched = false;
+ while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) {
+ try {
+ verify(delService).delete(eq(user), (Path) eq(null),
+ Mockito.argThat(new DeletePathsMatcher(matchPaths)));
+ matched = true;
+ } catch (WantedButNotInvoked e) {
+ notInvokedException = e;
+ try {
+ Thread.sleep(50l);
+ } catch (InterruptedException i) {
+ }
+ }
+ }
+ if (!matched) {
+ throw notInvokedException;
+ }
+ return;
+ }
+
+ public static File[] getLocalLogDirFiles(String name, int number) {
+ File[] dirs = new File[number];
+ for (int i = 0; i < dirs.length; i++) {
+ dirs[i] = new File("target", name + "-localLogDir" + i).getAbsoluteFile();
+ }
+ return dirs;
+ }
}
[2/2] git commit: YARN-90. NodeManager should identify failed disks
becoming good again. Contributed by Varun Vasudev
Posted by jl...@apache.org.
YARN-90. NodeManager should identify failed disks becoming good again. Contributed by Varun Vasudev
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f2028bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f2028bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f2028bd
Branch: refs/heads/trunk
Commit: 6f2028bd1514d90b831f889fd0ee7f2ba5c15000
Parents: b6f9d55
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Oct 21 17:29:22 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Oct 21 17:31:13 2014 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/nodemanager/DirectoryCollection.java | 219 ++++++++---
.../nodemanager/LocalDirsHandlerService.java | 176 +++++++--
.../nodemanager/NodeHealthCheckerService.java | 4 +-
.../launcher/ContainerLaunch.java | 2 +-
.../localizer/ResourceLocalizationService.java | 270 ++++++++++---
.../logaggregation/AppLogAggregatorImpl.java | 35 +-
.../logaggregation/LogAggregationService.java | 13 +-
.../loghandler/NonAggregatingLogHandler.java | 47 ++-
.../nodemanager/TestDirectoryCollection.java | 75 +++-
.../TestLocalDirsHandlerService.java | 43 ++-
.../nodemanager/TestNodeHealthService.java | 2 +-
.../TestResourceLocalizationService.java | 312 ++++++++++++++-
.../TestLogAggregationService.java | 147 ++++---
.../TestNonAggregatingLogHandler.java | 382 +++++++++++++++----
15 files changed, 1441 insertions(+), 289 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b130ecf..af056b3 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -379,6 +379,9 @@ Release 2.6.0 - UNRELEASED
YARN-2582. Fixed Log CLI and Web UI for showing aggregated logs of LRS. (Xuan
Gong via zjshen)
+ YARN-90. NodeManager should identify failed disks becoming good again
+ (Varun Vasudev via jlowe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
index f6ee128..279787b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
@@ -21,18 +21,23 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
* Manages a list of local storage directories.
@@ -40,9 +45,38 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
class DirectoryCollection {
private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
+ public enum DiskErrorCause {
+ DISK_FULL, OTHER
+ }
+
+ static class DiskErrorInformation {
+ DiskErrorCause cause;
+ String message;
+
+ DiskErrorInformation(DiskErrorCause cause, String message) {
+ this.cause = cause;
+ this.message = message;
+ }
+ }
+
+ /**
+ * Returns a merged list which contains all the elements of l1 and l2
+ * @param l1 the first list to be included
+ * @param l2 the second list to be included
+ * @return a new list containing all the elements of the first and second list
+ */
+ static List<String> concat(List<String> l1, List<String> l2) {
+ List<String> ret = new ArrayList<String>(l1.size() + l2.size());
+ ret.addAll(l1);
+ ret.addAll(l2);
+ return ret;
+ }
+
// Good local storage directories
private List<String> localDirs;
- private List<String> failedDirs;
+ private List<String> errorDirs;
+ private List<String> fullDirs;
+
private int numFailures;
private float diskUtilizationPercentageCutoff;
@@ -109,7 +143,9 @@ class DirectoryCollection {
float utilizationPercentageCutOff,
long utilizationSpaceCutOff) {
localDirs = new CopyOnWriteArrayList<String>(dirs);
- failedDirs = new CopyOnWriteArrayList<String>();
+ errorDirs = new CopyOnWriteArrayList<String>();
+ fullDirs = new CopyOnWriteArrayList<String>();
+
diskUtilizationPercentageCutoff = utilizationPercentageCutOff;
diskUtilizationSpaceCutoff = utilizationSpaceCutOff;
diskUtilizationPercentageCutoff =
@@ -131,7 +167,16 @@ class DirectoryCollection {
* @return the failed directories
*/
synchronized List<String> getFailedDirs() {
- return Collections.unmodifiableList(failedDirs);
+ return Collections.unmodifiableList(
+ DirectoryCollection.concat(errorDirs, fullDirs));
+ }
+
+ /**
+ * @return the directories that have used all disk space
+ */
+
+ synchronized List<String> getFullDirs() {
+ return fullDirs;
}
/**
@@ -158,7 +203,7 @@ class DirectoryCollection {
LOG.warn("Unable to create directory " + dir + " error " +
e.getMessage() + ", removing from the list of valid directories.");
localDirs.remove(dir);
- failedDirs.add(dir);
+ errorDirs.add(dir);
numFailures++;
failed = true;
}
@@ -167,61 +212,147 @@ class DirectoryCollection {
}
/**
- * Check the health of current set of local directories, updating the list
- * of valid directories if necessary.
- * @return <em>true</em> if there is a new disk-failure identified in
- * this checking. <em>false</em> otherwise.
+ * Check the health of current set of local directories(good and failed),
+ * updating the list of valid directories if necessary.
+ *
+ * @return <em>true</em> if there is a new disk-failure identified in this
+ * checking or a failed directory passes the disk check <em>false</em>
+ * otherwise.
*/
synchronized boolean checkDirs() {
- int oldNumFailures = numFailures;
- HashSet<String> checkFailedDirs = new HashSet<String>();
- for (final String dir : localDirs) {
+ boolean setChanged = false;
+ Set<String> preCheckGoodDirs = new HashSet<String>(localDirs);
+ Set<String> preCheckFullDirs = new HashSet<String>(fullDirs);
+ Set<String> preCheckOtherErrorDirs = new HashSet<String>(errorDirs);
+ List<String> failedDirs = DirectoryCollection.concat(errorDirs, fullDirs);
+ List<String> allLocalDirs =
+ DirectoryCollection.concat(localDirs, failedDirs);
+
+ Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs);
+
+ localDirs.clear();
+ errorDirs.clear();
+ fullDirs.clear();
+
+ for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck
+ .entrySet()) {
+ String dir = entry.getKey();
+ DiskErrorInformation errorInformation = entry.getValue();
+ switch (entry.getValue().cause) {
+ case DISK_FULL:
+ fullDirs.add(entry.getKey());
+ break;
+ case OTHER:
+ errorDirs.add(entry.getKey());
+ break;
+ }
+ if (preCheckGoodDirs.contains(dir)) {
+ LOG.warn("Directory " + dir + " error, " + errorInformation.message
+ + ", removing from list of valid directories");
+ setChanged = true;
+ numFailures++;
+ }
+ }
+ for (String dir : allLocalDirs) {
+ if (!dirsFailedCheck.containsKey(dir)) {
+ localDirs.add(dir);
+ if (preCheckFullDirs.contains(dir)
+ || preCheckOtherErrorDirs.contains(dir)) {
+ setChanged = true;
+ LOG.info("Directory " + dir
+ + " passed disk check, adding to list of valid directories.");
+ }
+ }
+ }
+ Set<String> postCheckFullDirs = new HashSet<String>(fullDirs);
+ Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs);
+ for (String dir : preCheckFullDirs) {
+ if (postCheckOtherDirs.contains(dir)) {
+ LOG.warn("Directory " + dir + " error "
+ + dirsFailedCheck.get(dir).message);
+ }
+ }
+
+ for (String dir : preCheckOtherErrorDirs) {
+ if (postCheckFullDirs.contains(dir)) {
+ LOG.warn("Directory " + dir + " error "
+ + dirsFailedCheck.get(dir).message);
+ }
+ }
+ return setChanged;
+ }
+
+ Map<String, DiskErrorInformation> testDirs(List<String> dirs) {
+ HashMap<String, DiskErrorInformation> ret =
+ new HashMap<String, DiskErrorInformation>();
+ for (final String dir : dirs) {
+ String msg;
try {
File testDir = new File(dir);
DiskChecker.checkDir(testDir);
- if (isDiskUsageUnderPercentageLimit(testDir)) {
- LOG.warn("Directory " + dir
- + " error, used space above threshold of "
- + diskUtilizationPercentageCutoff
- + "%, removing from the list of valid directories.");
- checkFailedDirs.add(dir);
- } else if (isDiskFreeSpaceWithinLimit(testDir)) {
- LOG.warn("Directory " + dir + " error, free space below limit of "
- + diskUtilizationSpaceCutoff
- + "MB, removing from the list of valid directories.");
- checkFailedDirs.add(dir);
+ if (isDiskUsageOverPercentageLimit(testDir)) {
+ msg =
+ "used space above threshold of "
+ + diskUtilizationPercentageCutoff
+ + "%";
+ ret.put(dir,
+ new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
+ continue;
+ } else if (isDiskFreeSpaceUnderLimit(testDir)) {
+ msg =
+ "free space below limit of " + diskUtilizationSpaceCutoff
+ + "MB";
+ ret.put(dir,
+ new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
+ continue;
}
- } catch (DiskErrorException de) {
- LOG.warn("Directory " + dir + " error " + de.getMessage()
- + ", removing from the list of valid directories.");
- checkFailedDirs.add(dir);
+
+ // create a random dir to make sure fs isn't in read-only mode
+ verifyDirUsingMkdir(testDir);
+ } catch (IOException ie) {
+ ret.put(dir,
+ new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage()));
}
}
- for (String dir : checkFailedDirs) {
- localDirs.remove(dir);
- failedDirs.add(dir);
- numFailures++;
+ return ret;
+ }
+
+ /**
+ * Function to test whether a dir is working correctly by actually creating a
+ * random directory.
+ *
+ * @param dir
+ * the dir to test
+ */
+ private void verifyDirUsingMkdir(File dir) throws IOException {
+
+ String randomDirName = RandomStringUtils.randomAlphanumeric(5);
+ File target = new File(dir, randomDirName);
+ int i = 0;
+ while (target.exists()) {
+
+ randomDirName = RandomStringUtils.randomAlphanumeric(5) + i;
+ target = new File(dir, randomDirName);
+ i++;
+ }
+ try {
+ DiskChecker.checkDir(target);
+ } finally {
+ FileUtils.deleteQuietly(target);
}
- return numFailures > oldNumFailures;
}
-
- private boolean isDiskUsageUnderPercentageLimit(File dir) {
+
+ private boolean isDiskUsageOverPercentageLimit(File dir) {
float freePercentage =
100 * (dir.getUsableSpace() / (float) dir.getTotalSpace());
float usedPercentage = 100.0F - freePercentage;
- if (usedPercentage > diskUtilizationPercentageCutoff
- || usedPercentage >= 100.0F) {
- return true;
- }
- return false;
+ return (usedPercentage > diskUtilizationPercentageCutoff
+ || usedPercentage >= 100.0F);
}
- private boolean isDiskFreeSpaceWithinLimit(File dir) {
+ private boolean isDiskFreeSpaceUnderLimit(File dir) {
long freeSpace = dir.getUsableSpace() / (1024 * 1024);
- if (freeSpace < this.diskUtilizationSpaceCutoff) {
- return true;
- }
- return false;
+ return freeSpace < this.diskUtilizationSpaceCutoff;
}
private void createDir(FileContext localFs, Path dir, FsPermission perm)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
index b053941..7d1aa53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -150,7 +152,7 @@ public class LocalDirsHandlerService extends AbstractService {
boolean createSucceeded = localDirs.createNonExistentDirs(localFs, perm);
createSucceeded &= logDirs.createNonExistentDirs(localFs, perm);
if (!createSucceeded) {
- updateDirsAfterFailure();
+ updateDirsAfterTest();
}
// Check the disk health immediately to weed out bad directories
@@ -197,9 +199,52 @@ public class LocalDirsHandlerService extends AbstractService {
}
/**
+ * @return the local directories which have no disk space
+ */
+ public List<String> getDiskFullLocalDirs() {
+ return localDirs.getFullDirs();
+ }
+
+ /**
+ * @return the log directories that have no disk space
+ */
+ public List<String> getDiskFullLogDirs() {
+ return logDirs.getFullDirs();
+ }
+
+ /**
+ * Function to get the local dirs which should be considered when cleaning up
+ * resources. Contains the good local dirs and the local dirs that have reached
+ * the disk space limit
+ *
+ * @return the local dirs which should be considered for cleaning up
+ */
+ public List<String> getLocalDirsForCleanup() {
+ return DirectoryCollection.concat(localDirs.getGoodDirs(),
+ localDirs.getFullDirs());
+ }
+
+ /**
+ * Function to get the log dirs which should be considered when cleaning up
+ * resources. Contains the good log dirs and the log dirs that have reached
+ * the disk space limit
+ *
+ * @return the log dirs which should be considered for cleaning up
+ */
+ public List<String> getLogDirsForCleanup() {
+ return DirectoryCollection.concat(logDirs.getGoodDirs(),
+ logDirs.getFullDirs());
+ }
+
+ /**
+ * Function to generate a report on the state of the disks.
+ *
+ * @param listGoodDirs
+ * flag to determine whether the report should report the state of
+ * good dirs or failed dirs
* @return the health report of nm-local-dirs and nm-log-dirs
*/
- public String getDisksHealthReport() {
+ public String getDisksHealthReport(boolean listGoodDirs) {
if (!isDiskHealthCheckerEnabled) {
return "";
}
@@ -207,20 +252,31 @@ public class LocalDirsHandlerService extends AbstractService {
StringBuilder report = new StringBuilder();
List<String> failedLocalDirsList = localDirs.getFailedDirs();
List<String> failedLogDirsList = logDirs.getFailedDirs();
- int numLocalDirs = localDirs.getGoodDirs().size()
- + failedLocalDirsList.size();
- int numLogDirs = logDirs.getGoodDirs().size() + failedLogDirsList.size();
- if (!failedLocalDirsList.isEmpty()) {
- report.append(failedLocalDirsList.size() + "/" + numLocalDirs
- + " local-dirs turned bad: "
- + StringUtils.join(",", failedLocalDirsList) + ";");
- }
- if (!failedLogDirsList.isEmpty()) {
- report.append(failedLogDirsList.size() + "/" + numLogDirs
- + " log-dirs turned bad: "
- + StringUtils.join(",", failedLogDirsList));
+ List<String> goodLocalDirsList = localDirs.getGoodDirs();
+ List<String> goodLogDirsList = logDirs.getGoodDirs();
+ int numLocalDirs = goodLocalDirsList.size() + failedLocalDirsList.size();
+ int numLogDirs = goodLogDirsList.size() + failedLogDirsList.size();
+ if (!listGoodDirs) {
+ if (!failedLocalDirsList.isEmpty()) {
+ report.append(failedLocalDirsList.size() + "/" + numLocalDirs
+ + " local-dirs are bad: "
+ + StringUtils.join(",", failedLocalDirsList) + "; ");
+ }
+ if (!failedLogDirsList.isEmpty()) {
+ report.append(failedLogDirsList.size() + "/" + numLogDirs
+ + " log-dirs are bad: " + StringUtils.join(",", failedLogDirsList));
+ }
+ } else {
+ report.append(goodLocalDirsList.size() + "/" + numLocalDirs
+ + " local-dirs are good: " + StringUtils.join(",", goodLocalDirsList)
+ + "; ");
+ report.append(goodLogDirsList.size() + "/" + numLogDirs
+ + " log-dirs are good: " + StringUtils.join(",", goodLogDirsList));
+
}
+
return report.toString();
+
}
/**
@@ -262,8 +318,8 @@ public class LocalDirsHandlerService extends AbstractService {
* Set good local dirs and good log dirs in the configuration so that the
* LocalDirAllocator objects will use this updated configuration only.
*/
- private void updateDirsAfterFailure() {
- LOG.info("Disk(s) failed. " + getDisksHealthReport());
+ private void updateDirsAfterTest() {
+
Configuration conf = getConfig();
List<String> localDirs = getLocalDirs();
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
@@ -273,23 +329,91 @@ public class LocalDirsHandlerService extends AbstractService {
logDirs.toArray(new String[logDirs.size()]));
if (!areDisksHealthy()) {
// Just log.
- LOG.error("Most of the disks failed. " + getDisksHealthReport());
+ LOG.error("Most of the disks failed. " + getDisksHealthReport(false));
}
}
+ private void logDiskStatus(boolean newDiskFailure, boolean diskTurnedGood) {
+ if (newDiskFailure) {
+ String report = getDisksHealthReport(false);
+ LOG.info("Disk(s) failed: " + report);
+ }
+ if (diskTurnedGood) {
+ String report = getDisksHealthReport(true);
+ LOG.info("Disk(s) turned good: " + report);
+ }
+
+ }
+
private void checkDirs() {
- boolean newFailure = false;
- if (localDirs.checkDirs()) {
- newFailure = true;
- }
- if (logDirs.checkDirs()) {
- newFailure = true;
+ boolean disksStatusChange = false;
+ Set<String> failedLocalDirsPreCheck =
+ new HashSet<String>(localDirs.getFailedDirs());
+ Set<String> failedLogDirsPreCheck =
+ new HashSet<String>(logDirs.getFailedDirs());
+
+ if (localDirs.checkDirs()) {
+ disksStatusChange = true;
+ }
+ if (logDirs.checkDirs()) {
+ disksStatusChange = true;
+ }
+
+ Set<String> failedLocalDirsPostCheck =
+ new HashSet<String>(localDirs.getFailedDirs());
+ Set<String> failedLogDirsPostCheck =
+ new HashSet<String>(logDirs.getFailedDirs());
+
+ boolean disksFailed = false;
+ boolean disksTurnedGood = false;
+
+ disksFailed =
+ disksTurnedBad(failedLocalDirsPreCheck, failedLocalDirsPostCheck);
+ disksTurnedGood =
+ disksTurnedGood(failedLocalDirsPreCheck, failedLocalDirsPostCheck);
+
+ // skip check if we have new failed or good local dirs since we're going to
+ // log anyway
+ if (!disksFailed) {
+ disksFailed =
+ disksTurnedBad(failedLogDirsPreCheck, failedLogDirsPostCheck);
+ }
+ if (!disksTurnedGood) {
+ disksTurnedGood =
+ disksTurnedGood(failedLogDirsPreCheck, failedLogDirsPostCheck);
+ }
+
+ logDiskStatus(disksFailed, disksTurnedGood);
+
+ if (disksStatusChange) {
+ updateDirsAfterTest();
+ }
+
+ lastDisksCheckTime = System.currentTimeMillis();
+ }
+
+ private boolean disksTurnedBad(Set<String> preCheckFailedDirs,
+ Set<String> postCheckDirs) {
+ boolean disksFailed = false;
+ for (String dir : postCheckDirs) {
+ if (!preCheckFailedDirs.contains(dir)) {
+ disksFailed = true;
+ break;
}
+ }
+ return disksFailed;
+ }
- if (newFailure) {
- updateDirsAfterFailure();
+ private boolean disksTurnedGood(Set<String> preCheckDirs,
+ Set<String> postCheckDirs) {
+ boolean disksTurnedGood = false;
+ for (String dir : preCheckDirs) {
+ if (!postCheckDirs.contains(dir)) {
+ disksTurnedGood = true;
+ break;
}
- lastDisksCheckTime = System.currentTimeMillis();
+ }
+ return disksTurnedGood;
}
public Path getLocalPathForWrite(String pathStr) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
index 446d05c..6d6001a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
@@ -55,9 +55,9 @@ public class NodeHealthCheckerService extends CompositeService {
String scriptReport = (nodeHealthScriptRunner == null) ? ""
: nodeHealthScriptRunner.getHealthReport();
if (scriptReport.equals("")) {
- return dirsHandler.getDisksHealthReport();
+ return dirsHandler.getDisksHealthReport(false);
} else {
- return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport());
+ return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport(false));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 87a36c4..f87ed6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -240,7 +240,7 @@ public class ContainerLaunch implements Callable<Integer> {
if (!dirsHandler.areDisksHealthy()) {
ret = ContainerExitStatus.DISKS_FAILED;
throw new IOException("Most of the disks failed. "
- + dirsHandler.getDisksHealthReport());
+ + dirsHandler.getDisksHealthReport(false));
}
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index d3b33e8..371684b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -55,11 +55,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
@@ -170,6 +172,8 @@ public class ResourceLocalizationService extends CompositeService
*/
private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
new ConcurrentHashMap<String,LocalResourcesTracker>();
+
+ FileContext lfs;
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
@@ -219,32 +223,17 @@ public class ResourceLocalizationService extends CompositeService
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
- FileContext lfs = getLocalFileContext(conf);
- lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
-
- if (!stateStore.canRecover() || stateStore.isNewlyCreated()) {
- cleanUpLocalDir(lfs,delService);
- }
-
- List<String> localDirs = dirsHandler.getLocalDirs();
- for (String localDir : localDirs) {
- // $local/usercache
- Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
- lfs.mkdir(userDir, null, true);
- // $local/filecache
- Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
- lfs.mkdir(fileDir, null, true);
- // $local/nmPrivate
- Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
- lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
- }
+ lfs = getLocalFileContext(conf);
+ lfs.setUMask(new FsPermission((short) FsPermission.DEFAULT_UMASK));
- List<String> logDirs = dirsHandler.getLogDirs();
- for (String logDir : logDirs) {
- lfs.mkdir(new Path(logDir), null, true);
+ if (!stateStore.canRecover()|| stateStore.isNewlyCreated()) {
+ cleanUpLocalDirs(lfs, delService);
+ initializeLocalDirs(lfs);
+ initializeLogDirs(lfs);
}
- } catch (IOException e) {
- throw new YarnRuntimeException("Failed to initialize LocalizationService", e);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(
+ "Failed to initialize LocalizationService", e);
}
cacheTargetSize =
@@ -497,28 +486,45 @@ public class ResourceLocalizationService extends CompositeService
String containerIDStr = c.toString();
String appIDStr = ConverterUtils.toString(
c.getContainerId().getApplicationAttemptId().getApplicationId());
- for (String localDir : dirsHandler.getLocalDirs()) {
+
+ // Try deleting from good local dirs and full local dirs because a dir might
+ // have gone bad while the app was running(disk full). In addition
+ // a dir might have become good while the app was running.
+ // Check if the container dir exists and if it does, try to delete it
+ for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
// Delete the user-owned container-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
Path containerDir = new Path(appDir, containerIDStr);
- delService.delete(userName, containerDir, new Path[] {});
+ submitDirForDeletion(userName, containerDir);
// Delete the nmPrivate container-dir
-
+
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
Path containerSysDir = new Path(appSysDir, containerIDStr);
- delService.delete(null, containerSysDir, new Path[] {});
+ submitDirForDeletion(null, containerSysDir);
}
dispatcher.getEventHandler().handle(
new ContainerEvent(c.getContainerId(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
}
+
+ private void submitDirForDeletion(String userName, Path dir) {
+ try {
+ lfs.getFileStatus(dir);
+ delService.delete(userName, dir, new Path[] {});
+ } catch (UnsupportedFileSystemException ue) {
+ LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
+ } catch (IOException ie) {
+ // ignore
+ return;
+ }
+ }
@SuppressWarnings({"unchecked"})
@@ -545,19 +551,22 @@ public class ResourceLocalizationService extends CompositeService
}
// Delete the application directories
- for (String localDir : dirsHandler.getLocalDirs()) {
+ userName = application.getUser();
+ appIDStr = application.toString();
+
+ for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
// Delete the user-owned app-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
- delService.delete(userName, appDir, new Path[] {});
+ submitDirForDeletion(userName, appDir);
// Delete the nmPrivate app-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
- delService.delete(null, appSysDir, new Path[] {});
+ submitDirForDeletion(null, appSysDir);
}
// TODO: decrement reference counts of all resources associated with this
@@ -590,8 +599,8 @@ public class ResourceLocalizationService extends CompositeService
private String getAppFileCachePath(String user, String appId) {
return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
- ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
- ContainerLocalizer.FILECACHE));
+ ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
+ ContainerLocalizer.FILECACHE));
}
@VisibleForTesting
@@ -868,7 +877,7 @@ public class ResourceLocalizationService extends CompositeService
/**
* Find next resource to be given to a spawned localizer.
*
- * @return
+ * @return the next resource to be localized
*/
private LocalResource findNextResource() {
synchronized (pending) {
@@ -1071,8 +1080,8 @@ public class ResourceLocalizationService extends CompositeService
// 1) write credentials to private dir
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
- List<String> localDirs = dirsHandler.getLocalDirs();
- List<String> logDirs = dirsHandler.getLogDirs();
+ List<String> localDirs = getInitializedLocalDirs();
+ List<String> logDirs = getInitializedLogDirs();
if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
@@ -1082,7 +1091,7 @@ public class ResourceLocalizationService extends CompositeService
localizerId, localDirs, logDirs);
} else {
throw new IOException("All disks failed. "
- + dirsHandler.getDisksHealthReport());
+ + dirsHandler.getDisksHealthReport(false));
}
// TODO handle ExitCodeException separately?
} catch (Exception e) {
@@ -1151,24 +1160,95 @@ public class ResourceLocalizationService extends CompositeService
}
- private void cleanUpLocalDir(FileContext lfs, DeletionService del) {
- long currentTimeStamp = System.currentTimeMillis();
- for (String localDir : dirsHandler.getLocalDirs()) {
- renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
- currentTimeStamp);
- renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
- currentTimeStamp);
- renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
- currentTimeStamp);
+ private void initializeLocalDirs(FileContext lfs) {
+ List<String> localDirs = dirsHandler.getLocalDirs();
+ for (String localDir : localDirs) {
+ initializeLocalDir(lfs, localDir);
+ }
+ }
+
+ private void initializeLocalDir(FileContext lfs, String localDir) {
+
+ Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
+ for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
+ FileStatus status;
try {
- deleteLocalDir(lfs, del, localDir);
- } catch (IOException e) {
- // Do nothing, just give the warning
- LOG.warn("Failed to delete localDir: " + localDir);
+ status = lfs.getFileStatus(entry.getKey());
+ }
+ catch(FileNotFoundException fs) {
+ status = null;
+ }
+ catch(IOException ie) {
+ String msg = "Could not get file status for local dir " + entry.getKey();
+ LOG.warn(msg, ie);
+ throw new YarnRuntimeException(msg, ie);
+ }
+ if(status == null) {
+ try {
+ lfs.mkdir(entry.getKey(), entry.getValue(), true);
+ status = lfs.getFileStatus(entry.getKey());
+ } catch (IOException e) {
+ String msg = "Could not initialize local dir " + entry.getKey();
+ LOG.warn(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+ FsPermission perms = status.getPermission();
+ if(!perms.equals(entry.getValue())) {
+ try {
+ lfs.setPermission(entry.getKey(), entry.getValue());
+ }
+ catch(IOException ie) {
+ String msg = "Could not set permissions for local dir " + entry.getKey();
+ LOG.warn(msg, ie);
+ throw new YarnRuntimeException(msg, ie);
+ }
}
}
}
+ private void initializeLogDirs(FileContext lfs) {
+ List<String> logDirs = dirsHandler.getLogDirs();
+ for (String logDir : logDirs) {
+ initializeLogDir(lfs, logDir);
+ }
+ }
+
+ private void initializeLogDir(FileContext lfs, String logDir) {
+ try {
+ lfs.mkdir(new Path(logDir), null, true);
+ } catch (FileAlreadyExistsException fe) {
+ // do nothing
+ } catch (IOException e) {
+ String msg = "Could not initialize log dir " + logDir;
+ LOG.warn(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+
+ private void cleanUpLocalDirs(FileContext lfs, DeletionService del) {
+ for (String localDir : dirsHandler.getLocalDirs()) {
+ cleanUpLocalDir(lfs, del, localDir);
+ }
+ }
+
+ private void cleanUpLocalDir(FileContext lfs, DeletionService del,
+ String localDir) {
+ long currentTimeStamp = System.currentTimeMillis();
+ renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
+ currentTimeStamp);
+ renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
+ currentTimeStamp);
+ renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
+ currentTimeStamp);
+ try {
+ deleteLocalDir(lfs, del, localDir);
+ } catch (IOException e) {
+ // Do nothing, just give the warning
+ LOG.warn("Failed to delete localDir: " + localDir);
+ }
+ }
+
private void renameLocalDir(FileContext lfs, String localDir,
String localSubDir, long currentTimeStamp) {
try {
@@ -1234,5 +1314,95 @@ public class ResourceLocalizationService extends CompositeService
del.scheduleFileDeletionTask(dependentDeletionTask);
}
}
+
+ /**
+ * Synchronized method to get a list of initialized local dirs. Method will
+ * check each local dir to ensure it has been setup correctly and will attempt
+ * to fix any issues it finds.
+ *
+ * @return list of initialized local dirs
+ */
+ synchronized private List<String> getInitializedLocalDirs() {
+ List<String> dirs = dirsHandler.getLocalDirs();
+ List<String> checkFailedDirs = new ArrayList<String>();
+ for (String dir : dirs) {
+ try {
+ checkLocalDir(dir);
+ } catch (YarnRuntimeException e) {
+ checkFailedDirs.add(dir);
+ }
+ }
+ for (String dir : checkFailedDirs) {
+ LOG.info("Attempting to initialize " + dir);
+ initializeLocalDir(lfs, dir);
+ try {
+ checkLocalDir(dir);
+ } catch (YarnRuntimeException e) {
+ String msg =
+ "Failed to setup local dir " + dir + ", which was marked as good.";
+ LOG.warn(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+ return dirs;
+ }
+
+ private boolean checkLocalDir(String localDir) {
+
+ Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
+
+ for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
+ FileStatus status;
+ try {
+ status = lfs.getFileStatus(entry.getKey());
+ } catch (Exception e) {
+ String msg =
+ "Could not carry out resource dir checks for " + localDir
+ + ", which was marked as good";
+ LOG.warn(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+
+ if (!status.getPermission().equals(entry.getValue())) {
+ String msg =
+ "Permissions incorrectly set for dir " + entry.getKey()
+ + ", should be " + entry.getValue() + ", actual value = "
+ + status.getPermission();
+ LOG.warn(msg);
+ throw new YarnRuntimeException(msg);
+ }
+ }
+ return true;
+ }
+
+ private Map<Path, FsPermission> getLocalDirsPathPermissionsMap(String localDir) {
+ Map<Path, FsPermission> localDirPathFsPermissionsMap = new HashMap<Path, FsPermission>();
+
+ FsPermission defaultPermission =
+ FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+ FsPermission nmPrivatePermission =
+ NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+ Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
+ Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
+ Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+
+ localDirPathFsPermissionsMap.put(userDir, defaultPermission);
+ localDirPathFsPermissionsMap.put(fileDir, defaultPermission);
+ localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
+ return localDirPathFsPermissionsMap;
+ }
+
+ /**
+ * Synchronized method to get a list of initialized log dirs. Method will
+ * check each local dir to ensure it has been setup correctly and will attempt
+ * to fix any issues it finds.
+ *
+ * @return list of initialized log dirs
+ */
+ synchronized private List<String> getInitializedLogDirs() {
+ List<String> dirs = dirsHandler.getLogDirs();
+ initializeLogDirs(lfs);
+ return dirs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 6e196bb..43cd7b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -37,9 +38,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -107,6 +110,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
private final Map<ApplicationAccessType, String> appAcls;
+ private final FileContext lfs;
private final LogAggregationContext logAggregationContext;
private final Context context;
private final int retentionSize;
@@ -122,8 +126,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls,
- LogAggregationContext logAggregationContext,
- Context context) {
+ LogAggregationContext logAggregationContext, Context context,
+ FileContext lfs) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
@@ -136,6 +140,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
this.appAcls = appAcls;
+ this.lfs = lfs;
this.logAggregationContext = logAggregationContext;
this.context = context;
this.nodeId = nodeId;
@@ -395,15 +400,25 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
uploadLogsForContainers();
// Remove the local app-log-dirs
- List<String> rootLogDirs = dirsHandler.getLogDirs();
- Path[] localAppLogDirs = new Path[rootLogDirs.size()];
- int index = 0;
- for (String rootLogDir : rootLogDirs) {
- localAppLogDirs[index] = new Path(rootLogDir, this.applicationId);
- index++;
+ List<Path> localAppLogDirs = new ArrayList<Path>();
+ for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
+ Path logPath = new Path(rootLogDir, applicationId);
+ try {
+ // check if log dir exists
+ lfs.getFileStatus(logPath);
+ localAppLogDirs.add(logPath);
+ } catch (UnsupportedFileSystemException ue) {
+ LOG.warn("Log dir " + rootLogDir + "is an unsupported file system", ue);
+ continue;
+ } catch (IOException fe) {
+ continue;
+ }
+ }
+
+ if (localAppLogDirs.size() > 0) {
+ this.delService.delete(this.userUgi.getShortUserName(), null,
+ localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
}
- this.delService.delete(this.userUgi.getShortUserName(), null,
- localAppLogDirs);
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 1d6a9e1..77176b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -326,6 +327,15 @@ public class LogAggregationService extends AbstractService implements
}
this.dispatcher.getEventHandler().handle(eventResponse);
}
+
+ FileContext getLocalFileContext(Configuration conf) {
+ try {
+ return FileContext.getLocalFSFileContext(conf);
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Failed to access local fs");
+ }
+ }
+
protected void initAppAggregator(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
@@ -344,7 +354,8 @@ public class LogAggregationService extends AbstractService implements
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
- appAcls, logAggregationContext, this.context);
+ appAcls, logAggregationContext, this.context,
+ getLocalFileContext(getConfig()));
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 40173e1..0422ef9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,11 +30,14 @@ import java.util.concurrent.RejectedExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -96,6 +101,15 @@ public class NonAggregatingLogHandler extends AbstractService implements
}
super.serviceStop();
}
+
+ FileContext getLocalFileContext(Configuration conf) {
+ try {
+ return FileContext.getLocalFSFileContext(conf);
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Failed to access local fs");
+ }
+ }
+
@SuppressWarnings("unchecked")
@Override
@@ -160,21 +174,30 @@ public class NonAggregatingLogHandler extends AbstractService implements
@Override
@SuppressWarnings("unchecked")
public void run() {
- List<String> rootLogDirs =
- NonAggregatingLogHandler.this.dirsHandler.getLogDirs();
- Path[] localAppLogDirs = new Path[rootLogDirs.size()];
- int index = 0;
- for (String rootLogDir : rootLogDirs) {
- localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
- index++;
+ List<Path> localAppLogDirs = new ArrayList<Path>();
+ FileContext lfs = getLocalFileContext(getConfig());
+ for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
+ Path logDir = new Path(rootLogDir, applicationId.toString());
+ try {
+ lfs.getFileStatus(logDir);
+ localAppLogDirs.add(logDir);
+ } catch (UnsupportedFileSystemException ue) {
+ LOG.warn("Unsupported file system used for log dir " + logDir, ue);
+ continue;
+ } catch (IOException ie) {
+ continue;
+ }
}
+
// Inform the application before the actual delete itself, so that links
- // to logs will no longer be there on NM web-UI.
+ // to logs will no longer be there on NM web-UI.
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
- new ApplicationEvent(this.applicationId,
- ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
- NonAggregatingLogHandler.this.delService.delete(user, null,
- localAppLogDirs);
+ new ApplicationEvent(this.applicationId,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
+ if (localAppLogDirs.size() > 0) {
+ NonAggregatingLogHandler.this.delService.delete(user, null,
+ (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
index f19731f..e435375 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
public class TestDirectoryCollection {
@@ -42,8 +44,13 @@ public class TestDirectoryCollection {
TestDirectoryCollection.class.getName()).getAbsoluteFile();
private static final File testFile = new File(testDir, "testfile");
+ private Configuration conf;
+ private FileContext localFs;
+
@Before
- public void setup() throws IOException {
+ public void setupForTests() throws IOException {
+ conf = new Configuration();
+ localFs = FileContext.getLocalFSFileContext(conf);
testDir.mkdirs();
testFile.createNewFile();
}
@@ -56,11 +63,12 @@ public class TestDirectoryCollection {
@Test
public void testConcurrentAccess() throws IOException {
// Initialize DirectoryCollection with a file instead of a directory
- Configuration conf = new Configuration();
+
String[] dirs = {testFile.getPath()};
- DirectoryCollection dc = new DirectoryCollection(dirs,
- conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
- YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
+ DirectoryCollection dc =
+ new DirectoryCollection(dirs, conf.getFloat(
+ YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+ YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
// Create an iterator before checkDirs is called to reliable test case
List<String> list = dc.getGoodDirs();
@@ -78,9 +86,8 @@ public class TestDirectoryCollection {
@Test
public void testCreateDirectories() throws IOException {
- Configuration conf = new Configuration();
+
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
- FileContext localFs = FileContext.getLocalFSFileContext(conf);
String dirA = new File(testDir, "dirA").getPath();
String dirB = new File(dirA, "dirB").getPath();
@@ -92,9 +99,10 @@ public class TestDirectoryCollection {
localFs.setPermission(pathC, permDirC);
String[] dirs = { dirA, dirB, dirC };
- DirectoryCollection dc = new DirectoryCollection(dirs,
- conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
- YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
+ DirectoryCollection dc =
+ new DirectoryCollection(dirs, conf.getFloat(
+ YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+ YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
FsPermission defaultPerm = FsPermission.getDefault()
.applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm);
@@ -120,25 +128,29 @@ public class TestDirectoryCollection {
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(1, dc.getFailedDirs().size());
+ Assert.assertEquals(1, dc.getFullDirs().size());
dc = new DirectoryCollection(dirs, 100.0F);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(0, dc.getFailedDirs().size());
+ Assert.assertEquals(0, dc.getFullDirs().size());
dc = new DirectoryCollection(dirs, testDir.getTotalSpace() / (1024 * 1024));
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(1, dc.getFailedDirs().size());
+ Assert.assertEquals(1, dc.getFullDirs().size());
dc = new DirectoryCollection(dirs, 100.0F, 0);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(0, dc.getFailedDirs().size());
+ Assert.assertEquals(0, dc.getFullDirs().size());
}
@Test
- public void testDiskLimitsCutoffSetters() {
+ public void testDiskLimitsCutoffSetters() throws IOException {
String[] dirs = { "dir" };
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 100);
@@ -163,6 +175,47 @@ public class TestDirectoryCollection {
}
@Test
+ public void testFailedDisksBecomingGoodAgain() throws Exception {
+
+ String dirA = new File(testDir, "dirA").getPath();
+ String[] dirs = { dirA };
+ DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
+ dc.checkDirs();
+ Assert.assertEquals(0, dc.getGoodDirs().size());
+ Assert.assertEquals(1, dc.getFailedDirs().size());
+ Assert.assertEquals(1, dc.getFullDirs().size());
+
+ dc.setDiskUtilizationPercentageCutoff(100.0F);
+ dc.checkDirs();
+ Assert.assertEquals(1, dc.getGoodDirs().size());
+ Assert.assertEquals(0, dc.getFailedDirs().size());
+ Assert.assertEquals(0, dc.getFullDirs().size());
+
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+
+ String dirB = new File(testDir, "dirB").getPath();
+ Path pathB = new Path(dirB);
+ FsPermission permDirB = new FsPermission((short) 0400);
+
+ localFs.mkdir(pathB, null, true);
+ localFs.setPermission(pathB, permDirB);
+
+ String[] dirs2 = { dirB };
+
+ dc = new DirectoryCollection(dirs2, 100.0F);
+ dc.checkDirs();
+ Assert.assertEquals(0, dc.getGoodDirs().size());
+ Assert.assertEquals(1, dc.getFailedDirs().size());
+ Assert.assertEquals(0, dc.getFullDirs().size());
+ permDirB = new FsPermission((short) 0700);
+ localFs.setPermission(pathB, permDirB);
+ dc.checkDirs();
+ Assert.assertEquals(1, dc.getGoodDirs().size());
+ Assert.assertEquals(0, dc.getFailedDirs().size());
+ Assert.assertEquals(0, dc.getFullDirs().size());
+ }
+
+ @Test
public void testConstructors() {
String[] dirs = { "dir" };
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
index 057ea91..e22b7f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
@@ -21,8 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
import java.io.IOException;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -57,10 +62,11 @@ public class TestLocalDirsHandlerService {
LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
dirSvc.init(conf);
Assert.assertEquals(1, dirSvc.getLocalDirs().size());
+ dirSvc.close();
}
@Test
- public void testValidPathsDirHandlerService() {
+ public void testValidPathsDirHandlerService() throws Exception {
Configuration conf = new YarnConfiguration();
String localDir1 = new File("file:///" + testDir, "localDir1").getPath();
String localDir2 = new File("hdfs:///" + testDir, "localDir2").getPath();
@@ -76,5 +82,40 @@ public class TestLocalDirsHandlerService {
Assert.assertEquals("Service should not be inited",
STATE.STOPPED,
dirSvc.getServiceState());
+ dirSvc.close();
+ }
+
+ @Test
+ public void testGetFullDirs() throws Exception {
+ Configuration conf = new YarnConfiguration();
+
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+ FileContext localFs = FileContext.getLocalFSFileContext(conf);
+
+ String localDir1 = new File(testDir, "localDir1").getPath();
+ String localDir2 = new File(testDir, "localDir2").getPath();
+ String logDir1 = new File(testDir, "logDir1").getPath();
+ String logDir2 = new File(testDir, "logDir2").getPath();
+ Path localDir1Path = new Path(localDir1);
+ Path logDir1Path = new Path(logDir1);
+ FsPermission dirPermissions = new FsPermission((short) 0410);
+ localFs.mkdir(localDir1Path, dirPermissions, true);
+ localFs.mkdir(logDir1Path, dirPermissions, true);
+
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2);
+ conf.set(YarnConfiguration.NM_LOG_DIRS, logDir1 + "," + logDir2);
+ conf.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+ 0.0f);
+ LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
+ dirSvc.init(conf);
+ Assert.assertEquals(0, dirSvc.getLocalDirs().size());
+ Assert.assertEquals(0, dirSvc.getLogDirs().size());
+ Assert.assertEquals(1, dirSvc.getDiskFullLocalDirs().size());
+ Assert.assertEquals(1, dirSvc.getDiskFullLogDirs().size());
+ FileUtils.deleteDirectory(new File(localDir1));
+ FileUtils.deleteDirectory(new File(localDir2));
+ FileUtils.deleteDirectory(new File(logDir1));
+ FileUtils.deleteDirectory(new File(logDir1));
+ dirSvc.close();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
index 6a28605..3542196 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
@@ -196,7 +196,7 @@ public class TestNodeHealthService {
healthStatus.getHealthReport().equals(
NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG
+ NodeHealthCheckerService.SEPARATOR
- + nodeHealthChecker.getDiskHandler().getDisksHealthReport()));
+ + nodeHealthChecker.getDiskHandler().getDisksHealthReport(false)));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index fa5a4fc..d569fa7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -42,6 +42,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -68,13 +69,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.security.AccessControlException;
import org.junit.Assert;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -167,15 +171,15 @@ public class TestResourceLocalizationService {
conf = new Configuration();
spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
lfs = FileContext.getFileContext(spylfs, conf);
- doNothing().when(spylfs).mkdir(
- isA(Path.class), isA(FsPermission.class), anyBoolean());
+
String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
}
@After
- public void cleanup() {
+ public void cleanup() throws IOException {
conf = null;
+ FileUtils.deleteDirectory(new File(basedir.toString()));
}
@Test
@@ -752,6 +756,39 @@ public class TestResourceLocalizationService {
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+ FsPermission defaultPermission =
+ FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+ FsPermission nmPermission =
+ ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+ final Path userDir =
+ new Path(sDirs[0].substring("file:".length()),
+ ContainerLocalizer.USERCACHE);
+ final Path fileDir =
+ new Path(sDirs[0].substring("file:".length()),
+ ContainerLocalizer.FILECACHE);
+ final Path sysDir =
+ new Path(sDirs[0].substring("file:".length()),
+ ResourceLocalizationService.NM_PRIVATE_DIR);
+ final FileStatus fs =
+ new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ defaultPermission, "", "", new Path(sDirs[0]));
+ final FileStatus nmFs =
+ new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ nmPermission, "", "", sysDir);
+
+ doAnswer(new Answer<FileStatus>() {
+ @Override
+ public FileStatus answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ if (args.length > 0) {
+ if (args[0].equals(userDir) || args[0].equals(fileDir)) {
+ return fs;
+ }
+ }
+ return nmFs;
+ }
+ }).when(spylfs).getFileStatus(isA(Path.class));
+
try {
spyService.init(conf);
spyService.start();
@@ -1775,5 +1812,274 @@ public class TestResourceLocalizationService {
return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),
new Text("kind" + id), new Text("service" + id));
}
+
+ /*
+ * Test to ensure ResourceLocalizationService can handle local dirs going bad.
+ * Test first sets up all the components required, then sends events to fetch
+ * a private, app and public resource. It then sends events to clean up the
+ * container and the app and ensures the right delete calls were made.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ // mocked generics
+ public void testFailedDirsResourceRelease() throws Exception {
+ // setup components
+ File f = new File(basedir.toString());
+ String[] sDirs = new String[4];
+ List<Path> localDirs = new ArrayList<Path>(sDirs.length);
+ for (int i = 0; i < 4; ++i) {
+ sDirs[i] = f.getAbsolutePath() + i;
+ localDirs.add(new Path(sDirs[i]));
+ }
+ List<Path> containerLocalDirs = new ArrayList<Path>(localDirs.size());
+ List<Path> appLocalDirs = new ArrayList<Path>(localDirs.size());
+ List<Path> nmLocalContainerDirs = new ArrayList<Path>(localDirs.size());
+ List<Path> nmLocalAppDirs = new ArrayList<Path>(localDirs.size());
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+ conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);
+
+ LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+ // Ignore actual localization
+ EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ LocalDirsHandlerService mockDirsHandler =
+ mock(LocalDirsHandlerService.class);
+ doReturn(new ArrayList<String>(Arrays.asList(sDirs))).when(
+ mockDirsHandler).getLocalDirsForCleanup();
+
+ DeletionService delService = mock(DeletionService.class);
+
+ // setup mocks
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher, exec, delService,
+ mockDirsHandler, new NMNullStateStoreService());
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(mockServer).when(spyService).createServer();
+ doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
+ isA(Configuration.class));
+ doReturn(lfs).when(spyService)
+ .getLocalFileContext(isA(Configuration.class));
+ FsPermission defaultPermission =
+ FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+ FsPermission nmPermission =
+ ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+ final FileStatus fs =
+ new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ defaultPermission, "", "", localDirs.get(0));
+ final FileStatus nmFs =
+ new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ nmPermission, "", "", localDirs.get(0));
+
+ final String user = "user0";
+ // init application
+ final Application app = mock(Application.class);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
+ when(app.getUser()).thenReturn(user);
+ when(app.getAppId()).thenReturn(appId);
+ when(app.toString()).thenReturn(ConverterUtils.toString(appId));
+
+ // init container.
+ final Container c = getMockContainer(appId, 42, user);
+
+ // setup local app dirs
+ List<String> tmpDirs = mockDirsHandler.getLocalDirs();
+ for (int i = 0; i < tmpDirs.size(); ++i) {
+ Path usersdir = new Path(tmpDirs.get(i), ContainerLocalizer.USERCACHE);
+ Path userdir = new Path(usersdir, user);
+ Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+ Path appDir = new Path(allAppsdir, ConverterUtils.toString(appId));
+ Path containerDir =
+ new Path(appDir, ConverterUtils.toString(c.getContainerId()));
+ containerLocalDirs.add(containerDir);
+ appLocalDirs.add(appDir);
+
+ Path sysDir =
+ new Path(tmpDirs.get(i), ResourceLocalizationService.NM_PRIVATE_DIR);
+ Path appSysDir = new Path(sysDir, ConverterUtils.toString(appId));
+ Path containerSysDir =
+ new Path(appSysDir, ConverterUtils.toString(c.getContainerId()));
+
+ nmLocalContainerDirs.add(containerSysDir);
+ nmLocalAppDirs.add(appSysDir);
+ }
+
+ try {
+ spyService.init(conf);
+ spyService.start();
+
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+ dispatcher.await();
+
+ // Get a handle on the trackers after they're setup with
+ // INIT_APP_RESOURCES
+ LocalResourcesTracker appTracker =
+ spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.APPLICATION, user, appId);
+ LocalResourcesTracker privTracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+ user, appId);
+ LocalResourcesTracker pubTracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+ user, appId);
+
+ // init resources
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+
+ // Send localization requests, one for each type of resource
+ final LocalResource privResource = getPrivateMockedResource(r);
+ final LocalResourceRequest privReq =
+ new LocalResourceRequest(privResource);
+
+ final LocalResource appResource = getAppMockedResource(r);
+ final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
+
+ final LocalResource pubResource = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+ req.put(LocalResourceVisibility.PRIVATE,
+ Collections.singletonList(privReq));
+ req.put(LocalResourceVisibility.APPLICATION,
+ Collections.singletonList(appReq));
+ req
+ .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+ new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+ req2.put(LocalResourceVisibility.PRIVATE,
+ Collections.singletonList(privReq));
+
+ // Send Request event
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
+ dispatcher.await();
+
+ int privRsrcCount = 0;
+ for (LocalizedResource lr : privTracker) {
+ privRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
+ Assert.assertEquals(privReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, privRsrcCount);
+
+ int appRsrcCount = 0;
+ for (LocalizedResource lr : appTracker) {
+ appRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+ Assert.assertEquals(appReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, appRsrcCount);
+
+ int pubRsrcCount = 0;
+ for (LocalizedResource lr : pubTracker) {
+ pubRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+ Assert.assertEquals(pubReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, pubRsrcCount);
+
+ // setup mocks for test, a set of dirs with IOExceptions and let the rest
+ // go through
+ for (int i = 0; i < containerLocalDirs.size(); ++i) {
+ if (i == 2) {
+ Mockito.doThrow(new IOException()).when(spylfs)
+ .getFileStatus(eq(containerLocalDirs.get(i)));
+ Mockito.doThrow(new IOException()).when(spylfs)
+ .getFileStatus(eq(nmLocalContainerDirs.get(i)));
+ } else {
+ doReturn(fs).when(spylfs)
+ .getFileStatus(eq(containerLocalDirs.get(i)));
+ doReturn(nmFs).when(spylfs).getFileStatus(
+ eq(nmLocalContainerDirs.get(i)));
+ }
+ }
+
+ // Send Cleanup Event
+ spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
+ verify(mockLocallilzerTracker).cleanupPrivLocalizers(
+ "container_314159265358979_0003_01_000042");
+
+ // match cleanup events with the mocks we setup earlier
+ for (int i = 0; i < containerLocalDirs.size(); ++i) {
+ if (i == 2) {
+ try {
+ verify(delService).delete(user, containerLocalDirs.get(i));
+ verify(delService).delete(null, nmLocalContainerDirs.get(i));
+ Assert.fail("deletion attempts for invalid dirs");
+ } catch (Throwable e) {
+ continue;
+ }
+ } else {
+ verify(delService).delete(user, containerLocalDirs.get(i));
+ verify(delService).delete(null, nmLocalContainerDirs.get(i));
+ }
+ }
+
+ ArgumentMatcher<ApplicationEvent> matchesAppDestroy =
+ new ArgumentMatcher<ApplicationEvent>() {
+ @Override
+ public boolean matches(Object o) {
+ ApplicationEvent evt = (ApplicationEvent) o;
+ return (evt.getType() == ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)
+ && appId == evt.getApplicationID();
+ }
+ };
+
+ dispatcher.await();
+
+ // setup mocks again, this time throw UnsupportedFileSystemException and
+ // IOExceptions
+ for (int i = 0; i < containerLocalDirs.size(); ++i) {
+ if (i == 3) {
+ Mockito.doThrow(new IOException()).when(spylfs)
+ .getFileStatus(eq(appLocalDirs.get(i)));
+ Mockito.doThrow(new UnsupportedFileSystemException("test"))
+ .when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
+ } else {
+ doReturn(fs).when(spylfs).getFileStatus(eq(appLocalDirs.get(i)));
+ doReturn(nmFs).when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
+ }
+ }
+ LocalizationEvent destroyApp =
+ new ApplicationLocalizationEvent(
+ LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app);
+ spyService.handle(destroyApp);
+ verify(applicationBus).handle(argThat(matchesAppDestroy));
+
+ // verify we got the right delete calls
+ for (int i = 0; i < containerLocalDirs.size(); ++i) {
+ if (i == 3) {
+ try {
+ verify(delService).delete(user, containerLocalDirs.get(i));
+ verify(delService).delete(null, nmLocalContainerDirs.get(i));
+ Assert.fail("deletion attempts for invalid dirs");
+ } catch (Throwable e) {
+ continue;
+ }
+ } else {
+ verify(delService).delete(user, appLocalDirs.get(i));
+ verify(delService).delete(null, nmLocalAppDirs.get(i));
+ }
+ }
+
+ } finally {
+ dispatcher.stop();
+ delService.stop();
+ }
+ }
}