You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ki...@apache.org on 2013/06/05 19:41:01 UTC

svn commit: r1489981 - in /hadoop/common/branches/branch-0.23/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ hadoop-yarn/had...

Author: kihwal
Date: Wed Jun  5 17:41:01 2013
New Revision: 1489981

URL: http://svn.apache.org/r1489981
Log:
YARN-742. Log aggregation causes a lot of redundant setPermission calls. Contributed by Jason Lowe.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt?rev=1489981&r1=1489980&r2=1489981&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/CHANGES.txt Wed Jun  5 17:41:01 2013
@@ -12,6 +12,9 @@ Release 0.23.9 - UNRELEASED
 
   BUG FIXES
 
+    YARN-742. Log aggregation causes a lot of redundant setPermission calls.
+    (jlowe via kihwal)
+
 Release 0.23.8 - 2013-06-05
   
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1489981&r1=1489980&r2=1489981&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed Jun  5 17:41:01 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
@@ -31,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -163,12 +165,16 @@ public class LogAggregationService exten
       LOG.warn("Some logs may not have been aggregated for " + appId);
     }
   }
-  
+
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return FileSystem.get(conf);
+  }
+
   private void verifyAndCreateRemoteLogDir(Configuration conf) {
-    // Checking the existance of the TLD
+    // Checking the existence of the TLD
     FileSystem remoteFS = null;
     try {
-      remoteFS = FileSystem.get(conf);
+      remoteFS = getFileSystem(conf);
     } catch (IOException e) {
       throw new YarnException("Unable to get Remote FileSystem instance", e);
     }
@@ -219,8 +225,26 @@ public class LogAggregationService exten
 
   private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
       throws IOException {
-    fs.mkdirs(path, new FsPermission(fsPerm));
-    fs.setPermission(path, new FsPermission(fsPerm));
+    FsPermission dirPerm = new FsPermission(fsPerm);
+    fs.mkdirs(path, dirPerm);
+    FsPermission umask = FsPermission.getUMask(fs.getConf());
+    if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
+      fs.setPermission(path, new FsPermission(fsPerm));
+    }
+  }
+
+  private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
+      throws IOException {
+    boolean exists = true;
+    try {
+      FileStatus appDirStatus = fs.getFileStatus(path);
+      if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
+        fs.setPermission(path, APP_DIR_PERMISSIONS);
+      }
+    } catch (FileNotFoundException fnfe) {
+      exists = false;
+    }
+    return exists;
   }
 
   protected void createAppDir(final String user, final ApplicationId appId,
@@ -229,57 +253,43 @@ public class LogAggregationService exten
       userUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public Object run() throws Exception {
-          // TODO: Reuse FS for user?
-          FileSystem remoteFS = null;
-          Path userDir = null;
-          Path suffixDir = null;
-          Path appDir = null;
-          try {
-            remoteFS = FileSystem.get(getConfig());
-          } catch (IOException e) {
-            LOG.error("Failed to get remote FileSystem while processing app "
-                + appId, e);
-            throw e;
-          }
           try {
-            userDir =
-                LogAggregationUtils.getRemoteLogUserDir(
+            // TODO: Reuse FS for user?
+            FileSystem remoteFS = getFileSystem(getConfig());
+
+            // Only creating directories if they are missing to avoid
+            // unnecessary load on the filesystem from all of the nodes
+            Path appDir = LogAggregationUtils.getRemoteAppLogDir(
+                LogAggregationService.this.remoteRootLogDir, appId, user,
+                LogAggregationService.this.remoteRootLogDirSuffix);
+            appDir = appDir.makeQualified(remoteFS.getUri(),
+                remoteFS.getWorkingDirectory());
+
+            if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
+              Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
+                  LogAggregationService.this.remoteRootLogDir, user,
+                  LogAggregationService.this.remoteRootLogDirSuffix);
+              suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
+                  remoteFS.getWorkingDirectory());
+
+              if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
+                Path userDir = LogAggregationUtils.getRemoteLogUserDir(
                     LogAggregationService.this.remoteRootLogDir, user);
-            userDir =
-                userDir.makeQualified(remoteFS.getUri(),
+                userDir = userDir.makeQualified(remoteFS.getUri(),
                     remoteFS.getWorkingDirectory());
-            createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
-          } catch (IOException e) {
-            LOG.error("Failed to create user dir [" + userDir
-                + "] while processing app " + appId);
-            throw e;
-          }
-          try {
-            suffixDir =
-                LogAggregationUtils.getRemoteLogSuffixedDir(
-                    LogAggregationService.this.remoteRootLogDir, user,
-                    LogAggregationService.this.remoteRootLogDirSuffix);
-            suffixDir =
-                suffixDir.makeQualified(remoteFS.getUri(),
-                    remoteFS.getWorkingDirectory());
-            createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
-          } catch (IOException e) {
-            LOG.error("Failed to create suffixed user dir [" + suffixDir
-                + "] while processing app " + appId);
-            throw e;
-          }
-          try {
-            appDir =
-                LogAggregationUtils.getRemoteAppLogDir(
-                    LogAggregationService.this.remoteRootLogDir, appId, user,
-                    LogAggregationService.this.remoteRootLogDirSuffix);
-            appDir =
-                appDir.makeQualified(remoteFS.getUri(),
-                    remoteFS.getWorkingDirectory());
-            createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+
+                if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
+                  createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+                }
+
+                createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+              }
+
+              createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+            }
           } catch (IOException e) {
-            LOG.error("Failed to  create application log dir [" + appDir
-                + "] while processing app " + appId);
+            LOG.error("Failed to setup application log directory for "
+                + appId, e);
             throw e;
           }
           return null;
@@ -300,7 +310,7 @@ public class LogAggregationService exten
       eventResponse = new ApplicationEvent(appId,
           ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
     } catch (YarnException e) {
-      LOG.warn("Application failed to init aggregation: " + e.getMessage());
+      LOG.warn("Application failed to init aggregation", e);
       eventResponse = new ApplicationEvent(appId,
           ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1489981&r1=1489980&r2=1489981&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Wed Jun  5 17:41:01 2013
@@ -18,9 +18,20 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
-import static org.mockito.Mockito.*;
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -44,17 +55,18 @@ import junit.framework.Assert;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -70,16 +82,15 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@@ -390,7 +401,63 @@ public class TestLogAggregationService e
     checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
     dispatcher.stop();
   }
-  
+
+  @Test
+  public void testAppLogDirCreation() throws Exception {
+    final String logSuffix = "logs";
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+        localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix);
+
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(this.conf);
+    dispatcher.start();
+
+    FileSystem fs = FileSystem.get(this.conf);
+    final FileSystem spyFs = spy(FileSystem.get(this.conf));
+
+    LogAggregationService aggSvc = new LogAggregationService(dispatcher,
+        this.context, this.delSrvc, super.dirsHandler) {
+      @Override
+      protected FileSystem getFileSystem(Configuration conf) {
+        return spyFs;
+      }
+    };
+
+    aggSvc.init(this.conf);
+    aggSvc.start();
+
+    // start an application and verify user, suffix, and app dirs created
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    Path userDir = fs.makeQualified(new Path(
+        remoteRootLogDir.getAbsolutePath(), this.user));
+    Path suffixDir = new Path(userDir, logSuffix);
+    Path appDir = new Path(suffixDir, appId.toString());
+    aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
+        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+    verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
+    verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
+    verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
+
+    // start another application and verify only app dir created
+    ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
+    Path appDir2 = new Path(suffixDir, appId2.toString());
+    aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
+        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+    verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
+
+    // start another application with the app dir already created and verify
+    // we do not try to create it again
+    ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
+    Path appDir3 = new Path(suffixDir, appId3.toString());
+    new File(appDir3.toUri().getPath()).mkdir();
+    aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
+        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+    verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {