You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ki...@apache.org on 2013/06/04 22:49:52 UTC
svn commit: r1489596 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/
hadoop-yarn/hadoop-yarn-server...
Author: kihwal
Date: Tue Jun 4 20:49:34 2013
New Revision: 1489596
URL: http://svn.apache.org/r1489596
Log:
YARN-742. Log aggregation causes a lot of redundant setPermission calls. Contributed by Jason Lowe.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1489596&r1=1489595&r2=1489596&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jun 4 20:49:34 2013
@@ -433,6 +433,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-757. Changed TestRMRestart to use the default scheduler to avoid test
failures. (Bikas Saha via vinodkv)
+ YARN-742. Log aggregation causes a lot of redundant setPermission calls.
+ (jlowe via kihwal)
+
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1489596&r1=1489595&r2=1489596&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue Jun 4 20:49:34 2013
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -162,12 +163,16 @@ public class LogAggregationService exten
LOG.warn("Some logs may not have been aggregated for " + appId);
}
}
-
+
+ protected FileSystem getFileSystem(Configuration conf) throws IOException {
+ return FileSystem.get(conf);
+ }
+
void verifyAndCreateRemoteLogDir(Configuration conf) {
// Checking the existence of the TLD
FileSystem remoteFS = null;
try {
- remoteFS = FileSystem.get(conf);
+ remoteFS = getFileSystem(conf);
} catch (IOException e) {
throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e);
}
@@ -212,8 +217,26 @@ public class LogAggregationService exten
private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
- fs.mkdirs(path, new FsPermission(fsPerm));
- fs.setPermission(path, new FsPermission(fsPerm));
+ FsPermission dirPerm = new FsPermission(fsPerm);
+ fs.mkdirs(path, dirPerm);
+ FsPermission umask = FsPermission.getUMask(fs.getConf());
+ if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
+ fs.setPermission(path, new FsPermission(fsPerm));
+ }
+ }
+
+ private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
+ throws IOException {
+ boolean exists = true;
+ try {
+ FileStatus appDirStatus = fs.getFileStatus(path);
+ if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
+ fs.setPermission(path, APP_DIR_PERMISSIONS);
+ }
+ } catch (FileNotFoundException fnfe) {
+ exists = false;
+ }
+ return exists;
}
protected void createAppDir(final String user, final ApplicationId appId,
@@ -222,57 +245,43 @@ public class LogAggregationService exten
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- // TODO: Reuse FS for user?
- FileSystem remoteFS = null;
- Path userDir = null;
- Path suffixDir = null;
- Path appDir = null;
- try {
- remoteFS = FileSystem.get(getConfig());
- } catch (IOException e) {
- LOG.error("Failed to get remote FileSystem while processing app "
- + appId, e);
- throw e;
- }
try {
- userDir =
- LogAggregationUtils.getRemoteLogUserDir(
+ // TODO: Reuse FS for user?
+ FileSystem remoteFS = getFileSystem(getConfig());
+
+ // Only creating directories if they are missing to avoid
+ // unnecessary load on the filesystem from all of the nodes
+ Path appDir = LogAggregationUtils.getRemoteAppLogDir(
+ LogAggregationService.this.remoteRootLogDir, appId, user,
+ LogAggregationService.this.remoteRootLogDirSuffix);
+ appDir = appDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
+ Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
+ LogAggregationService.this.remoteRootLogDir, user,
+ LogAggregationService.this.remoteRootLogDirSuffix);
+ suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
+ Path userDir = LogAggregationUtils.getRemoteLogUserDir(
LogAggregationService.this.remoteRootLogDir, user);
- userDir =
- userDir.makeQualified(remoteFS.getUri(),
+ userDir = userDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
- createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
- } catch (IOException e) {
- LOG.error("Failed to create user dir [" + userDir
- + "] while processing app " + appId);
- throw e;
- }
- try {
- suffixDir =
- LogAggregationUtils.getRemoteLogSuffixedDir(
- LogAggregationService.this.remoteRootLogDir, user,
- LogAggregationService.this.remoteRootLogDirSuffix);
- suffixDir =
- suffixDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
- createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
- } catch (IOException e) {
- LOG.error("Failed to create suffixed user dir [" + suffixDir
- + "] while processing app " + appId);
- throw e;
- }
- try {
- appDir =
- LogAggregationUtils.getRemoteAppLogDir(
- LogAggregationService.this.remoteRootLogDir, appId, user,
- LogAggregationService.this.remoteRootLogDirSuffix);
- appDir =
- appDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
- createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+
+ if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
+ createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+ }
} catch (IOException e) {
- LOG.error("Failed to create application log dir [" + appDir
- + "] while processing app " + appId);
+ LOG.error("Failed to setup application log directory for "
+ + appId, e);
throw e;
}
return null;
@@ -294,7 +303,7 @@ public class LogAggregationService exten
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
} catch (YarnRuntimeException e) {
- LOG.warn("Application failed to init aggregation: " + e.getMessage());
+ LOG.warn("Application failed to init aggregation", e);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1489596&r1=1489595&r2=1489596&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Tue Jun 4 20:49:34 2013
@@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMa
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -55,8 +56,10 @@ import junit.framework.Assert;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnRuntimeException;
@@ -78,6 +81,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -506,7 +510,63 @@ public class TestLogAggregationService e
assertTrue("The new aggregate file is not successfully created", existsAfter);
aNewFile.delete(); //housekeeping
}
-
+
+ @Test
+ public void testAppLogDirCreation() throws Exception {
+ final String logSuffix = "logs";
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+ localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix);
+
+ InlineDispatcher dispatcher = new InlineDispatcher();
+ dispatcher.init(this.conf);
+ dispatcher.start();
+
+ FileSystem fs = FileSystem.get(this.conf);
+ final FileSystem spyFs = spy(FileSystem.get(this.conf));
+
+ LogAggregationService aggSvc = new LogAggregationService(dispatcher,
+ this.context, this.delSrvc, super.dirsHandler) {
+ @Override
+ protected FileSystem getFileSystem(Configuration conf) {
+ return spyFs;
+ }
+ };
+
+ aggSvc.init(this.conf);
+ aggSvc.start();
+
+ // start an application and verify user, suffix, and app dirs created
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ Path userDir = fs.makeQualified(new Path(
+ remoteRootLogDir.getAbsolutePath(), this.user));
+ Path suffixDir = new Path(userDir, logSuffix);
+ Path appDir = new Path(suffixDir, appId.toString());
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+ verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
+ verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
+ verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
+
+ // start another application and verify only app dir created
+ ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
+ Path appDir2 = new Path(suffixDir, appId2.toString());
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+ verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
+
+ // start another application with the app dir already created and verify
+ // we do not try to create it again
+ ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
+ Path appDir3 = new Path(suffixDir, appId3.toString());
+ new File(appDir3.toUri().getPath()).mkdir();
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+ verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {