You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/08/24 20:35:07 UTC

[1/2] hadoop git commit: YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 8196a07c3 -> c2cb7ea1e


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 1601c3f..51c63c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -19,11 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -38,8 +34,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.Credentials;
@@ -57,7 +51,9 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
@@ -71,7 +67,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
 
@@ -86,18 +81,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private static final Logger LOG =
        LoggerFactory.getLogger(AppLogAggregatorImpl.class);
   private static final int THREAD_SLEEP_TIME = 1000;
-  // This is temporary solution. The configuration will be deleted once
-  // we find a more scalable method to only write a single log file per LRS.
-  private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
-      = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
-  private static final int
-      DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
-  
-  // This configuration is for debug and test purpose. By setting
-  // this configuration as true. We can break the lower bound of
-  // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
-  private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
-      = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
 
   private final LocalDirsHandlerService dirsHandler;
   private final Dispatcher dispatcher;
@@ -118,10 +101,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final FileContext lfs;
   private final LogAggregationContext logAggregationContext;
   private final Context context;
-  private final int retentionSize;
-  private final long rollingMonitorInterval;
-  private final boolean logAggregationInRolling;
   private final NodeId nodeId;
+  private final LogAggregationFileControllerContext logControllerContext;
 
   // These variables are only for testing
   private final AtomicBoolean waiting = new AtomicBoolean(false);
@@ -134,6 +115,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       new HashMap<ContainerId, ContainerLogAggregator>();
   private final ContainerLogAggregationPolicy logAggPolicy;
 
+  private final LogAggregationFileController logAggregationFileController;
+
 
   /**
    * The value recovered from state store to determine the age of application
@@ -151,7 +134,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       FileContext lfs, long rollingMonitorInterval) {
     this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
         dirsHandler, remoteNodeLogFileForApp, appAcls,
-        logAggregationContext, context, lfs, rollingMonitorInterval, -1);
+        logAggregationContext, context, lfs, rollingMonitorInterval, -1, null);
   }
 
   public AppLogAggregatorImpl(Dispatcher dispatcher,
@@ -162,6 +145,21 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       LogAggregationContext logAggregationContext, Context context,
       FileContext lfs, long rollingMonitorInterval,
       long recoveredLogInitedTime) {
+    this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
+        dirsHandler, remoteNodeLogFileForApp, appAcls,
+        logAggregationContext, context, lfs, rollingMonitorInterval,
+        recoveredLogInitedTime, null);
+  }
+
+  public AppLogAggregatorImpl(Dispatcher dispatcher,
+      DeletionService deletionService, Configuration conf,
+      ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
+      LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
+      Map<ApplicationAccessType, String> appAcls,
+      LogAggregationContext logAggregationContext, Context context,
+      FileContext lfs, long rollingMonitorInterval,
+      long recoveredLogInitedTime,
+      LogAggregationFileController logAggregationFileController) {
     this.dispatcher = dispatcher;
     this.conf = conf;
     this.delService = deletionService;
@@ -169,31 +167,41 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.applicationId = appId.toString();
     this.userUgi = userUgi;
     this.dirsHandler = dirsHandler;
-    this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
-    this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
     this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
     this.appAcls = appAcls;
     this.lfs = lfs;
     this.logAggregationContext = logAggregationContext;
     this.context = context;
     this.nodeId = nodeId;
-    int configuredRentionSize =
-        conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
-            DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
-    if (configuredRentionSize <= 0) {
-      this.retentionSize =
-          DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
+    this.logAggPolicy = getLogAggPolicy(conf);
+    this.recoveredLogInitedTime = recoveredLogInitedTime;
+    if (logAggregationFileController == null) {
+      // by default, use T-File Controller
+      this.logAggregationFileController = new LogAggregationTFileController();
+      this.logAggregationFileController.initialize(conf, "TFile");
+      this.logAggregationFileController.verifyAndCreateRemoteLogDir();
+      this.logAggregationFileController.createAppDir(
+          this.userUgi.getShortUserName(), appId, userUgi);
+      this.remoteNodeLogFileForApp = this.logAggregationFileController
+          .getRemoteNodeLogFileForApp(appId,
+              this.userUgi.getShortUserName(), nodeId);
+      this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
     } else {
-      this.retentionSize = configuredRentionSize;
+      this.logAggregationFileController = logAggregationFileController;
+      this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+      this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
     }
-    this.rollingMonitorInterval = rollingMonitorInterval;
-    this.logAggregationInRolling =
-        this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
+    boolean logAggregationInRolling =
+        rollingMonitorInterval <= 0 || this.logAggregationContext == null
             || this.logAggregationContext.getRolledLogsIncludePattern() == null
             || this.logAggregationContext.getRolledLogsIncludePattern()
-              .isEmpty() ? false : true;
-    this.logAggPolicy = getLogAggPolicy(conf);
-    this.recoveredLogInitedTime = recoveredLogInitedTime;
+                .isEmpty() ? false : true;
+    logControllerContext = new LogAggregationFileControllerContext(
+            this.remoteNodeLogFileForApp,
+            this.remoteNodeTmpLogFileForApp,
+            logAggregationInRolling,
+            rollingMonitorInterval,
+            this.appId, this.appAcls, this.nodeId, this.userUgi);
   }
 
   private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
@@ -293,14 +301,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     logAggregationTimes++;
     String diagnosticMessage = "";
     boolean logAggregationSucceedInThisCycle = true;
-    try (LogWriter writer = createLogWriter()) {
+    try {
       try {
-        writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
-            this.userUgi);
-        // Write ACLs once when the writer is created.
-        writer.writeApplicationACLs(appAcls);
-        writer.writeApplicationOwner(this.userUgi.getShortUserName());
-
+        logAggregationFileController.initializeWriter(logControllerContext);
       } catch (IOException e1) {
         logAggregationSucceedInThisCycle = false;
         LOG.error("Cannot create writer for app " + this.applicationId
@@ -318,8 +321,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
           containerLogAggregators.put(container, aggregator);
         }
         Set<Path> uploadedFilePathsInThisCycle =
-            aggregator.doContainerLogAggregation(writer, appFinished,
-            finishedContainers.contains(container));
+            aggregator.doContainerLogAggregation(logAggregationFileController,
+            appFinished, finishedContainers.contains(container));
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
           List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
@@ -337,60 +340,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         }
       }
 
-      // Before upload logs, make sure the number of existing logs
-      // is smaller than the configured NM log aggregation retention size.
-      if (uploadedLogsInThisCycle && logAggregationInRolling) {
-        cleanOldLogs();
-        cleanupOldLogTimes++;
-      }
-
-      long currentTime = System.currentTimeMillis();
-      final Path renamedPath = getRenamedPath(currentTime);
-
-      final boolean rename = uploadedLogsInThisCycle;
+      logControllerContext.setUploadedLogsInThisCycle(uploadedLogsInThisCycle);
+      logControllerContext.setLogUploadTimeStamp(System.currentTimeMillis());
+      logControllerContext.increLogAggregationTimes();
       try {
-        userUgi.doAs(new PrivilegedExceptionAction<Object>() {
-          @Override
-          public Object run() throws Exception {
-            FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
-            if (rename) {
-              remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
-            } else {
-              remoteFS.delete(remoteNodeTmpLogFileForApp, false);
-            }
-            return null;
-          }
-        });
-        diagnosticMessage =
-            "Log uploaded successfully for Application: " + appId
-                + " in NodeManager: "
-                + LogAggregationUtils.getNodeString(nodeId) + " at "
-                + Times.format(currentTime) + "\n";
+        this.logAggregationFileController.postWrite(logControllerContext);
+        diagnosticMessage = "Log uploaded successfully for Application: "
+            + appId + " in NodeManager: "
+            + LogAggregationUtils.getNodeString(nodeId) + " at "
+            + Times.format(logControllerContext.getLogUploadTimeStamp())
+            + "\n";
       } catch (Exception e) {
-        LOG.error(
-          "Failed to move temporary log file to final location: ["
-              + remoteNodeTmpLogFileForApp + "] to ["
-              + renamedPath + "]", e);
-        diagnosticMessage =
-            "Log uploaded failed for Application: " + appId
-                + " in NodeManager: "
-                + LogAggregationUtils.getNodeString(nodeId) + " at "
-                + Times.format(currentTime) + "\n";
+        diagnosticMessage = e.getMessage();
         renameTemporaryLogFileFailed = true;
         logAggregationSucceedInThisCycle = false;
       }
     } finally {
       sendLogAggregationReport(logAggregationSucceedInThisCycle,
           diagnosticMessage, appFinished);
+      logAggregationFileController.closeWriter();
     }
   }
 
-  private Path getRenamedPath(long currentTime) {
-    return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp
-        : new Path(remoteNodeLogFileForApp.getParent(),
-        remoteNodeLogFileForApp.getName() + "_" + currentTime);
-  }
-
   private void addCredentials() {
     if (UserGroupInformation.isSecurityEnabled()) {
       Credentials systemCredentials =
@@ -407,11 +378,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     }
   }
 
-  @VisibleForTesting
-  protected LogWriter createLogWriter() {
-    return new LogWriter();
-  }
-
   private void sendLogAggregationReport(
       boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
       boolean appFinished) {
@@ -442,60 +408,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.context.getLogAggregationStatusForApps().add(report);
   }
 
-  private void cleanOldLogs() {
-    try {
-      final FileSystem remoteFS =
-          this.remoteNodeLogFileForApp.getFileSystem(conf);
-      Path appDir =
-          this.remoteNodeLogFileForApp.getParent().makeQualified(
-            remoteFS.getUri(), remoteFS.getWorkingDirectory());
-      Set<FileStatus> status =
-          new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
-
-      Iterable<FileStatus> mask =
-          Iterables.filter(status, new Predicate<FileStatus>() {
-            @Override
-            public boolean apply(FileStatus next) {
-              return next.getPath().getName()
-                .contains(LogAggregationUtils.getNodeString(nodeId))
-                && !next.getPath().getName().endsWith(
-                    LogAggregationUtils.TMP_FILE_SUFFIX);
-            }
-          });
-      status = Sets.newHashSet(mask);
-      // Normally, we just need to delete one oldest log
-      // before we upload a new log.
-      // If we can not delete the older logs in this cycle,
-      // we will delete them in next cycle.
-      if (status.size() >= this.retentionSize) {
-        // sort by the lastModificationTime ascending
-        List<FileStatus> statusList = new ArrayList<FileStatus>(status);
-        Collections.sort(statusList, new Comparator<FileStatus>() {
-          public int compare(FileStatus s1, FileStatus s2) {
-            return s1.getModificationTime() < s2.getModificationTime() ? -1
-                : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
-          }
-        });
-        for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
-          final FileStatus remove = statusList.get(i);
-          try {
-            userUgi.doAs(new PrivilegedExceptionAction<Object>() {
-              @Override
-              public Object run() throws Exception {
-                remoteFS.delete(remove.getPath(), false);
-                return null;
-              }
-            });
-          } catch (Exception e) {
-            LOG.error("Failed to delete " + remove.getPath(), e);
-          }
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Failed to clean old logs", e);
-    }
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void run() {
@@ -523,8 +435,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       synchronized(this) {
         try {
           waiting.set(true);
-          if (logAggregationInRolling) {
-            wait(this.rollingMonitorInterval * 1000);
+          if (logControllerContext.isLogAggregationInRolling()) {
+            wait(logControllerContext.getRollingMonitorInterval() * 1000);
             if (this.appFinishing.get() || this.aborted.get()) {
               break;
             }
@@ -653,7 +565,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
           recoveredLogInitedTime, logRetentionSecs * 1000);
     }
 
-    public Set<Path> doContainerLogAggregation(LogWriter writer,
+    public Set<Path> doContainerLogAggregation(
+        LogAggregationFileController logAggregationFileController,
         boolean appFinished, boolean containerFinished) {
       LOG.info("Uploading logs for container " + containerId
           + ". Current good log dirs are "
@@ -665,7 +578,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             this.uploadedFileMeta,  retentionContext, appFinished,
             containerFinished);
       try {
-        writer.append(logKey, logValue);
+        logAggregationFileController.write(logKey, logValue);
       } catch (Exception e) {
         LOG.error("Couldn't upload logs for " + containerId
             + ". Skipping this container.", e);
@@ -708,4 +621,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   int getCleanupOldLogTimes() {
     return this.cleanupOldLogTimes;
   }
+
+  @VisibleForTesting
+  public LogAggregationFileController getLogAggregationFileController() {
+    return this.logAggregationFileController;
+  }
+
+  @VisibleForTesting
+  public LogAggregationFileControllerContext
+      getLogAggregationFileControllerContext() {
+    return this.logControllerContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index aafd7d8..1a59e45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -18,9 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -32,10 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
@@ -48,7 +43,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -79,36 +75,14 @@ public class LogAggregationService extends AbstractService implements
       = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
   private long rollingMonitorInterval;
 
-  /*
-   * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
-   * Group to which NMOwner belongs> App dirs will be created as 770,
-   * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
-   * access / modify the files.
-   * <NMGroup> should obviously be a limited access group.
-   */
-  /**
-   * Permissions for the top level directory under which app directories will be
-   * created.
-   */
-  private static final FsPermission TLDIR_PERMISSIONS = FsPermission
-      .createImmutable((short) 01777);
-  /**
-   * Permissions for the Application directory.
-   */
-  private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
-      .createImmutable((short) 0770);
-
   private final Context context;
   private final DeletionService deletionService;
   private final Dispatcher dispatcher;
 
   private LocalDirsHandlerService dirsHandler;
-  Path remoteRootLogDir;
-  String remoteRootLogDirSuffix;
   private NodeId nodeId;
 
   private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
-  private boolean logPermError = true;
 
   @VisibleForTesting
   ExecutorService threadPool;
@@ -125,12 +99,6 @@ public class LogAggregationService extends AbstractService implements
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
-    this.remoteRootLogDir =
-        new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    this.remoteRootLogDirSuffix =
-        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
     int threadPoolSize = getAggregatorThreadPoolSize(conf);
     this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
         new ThreadFactoryBuilder()
@@ -218,158 +186,6 @@ public class LogAggregationService extends AbstractService implements
     }
   }
 
-  protected FileSystem getFileSystem(Configuration conf) throws IOException {
-    return this.remoteRootLogDir.getFileSystem(conf);
-  }
-
-  void verifyAndCreateRemoteLogDir(Configuration conf) {
-    // Checking the existence of the TLD
-    FileSystem remoteFS = null;
-    try {
-      remoteFS = getFileSystem(conf);
-    } catch (IOException e) {
-      throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e);
-    }
-    boolean remoteExists = true;
-    try {
-      FsPermission perms =
-          remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
-      if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
-        LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
-            + "] already exist, but with incorrect permissions. "
-            + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
-            + "]." + " The cluster may have problems with multiple users.");
-        logPermError = false;
-      } else {
-        logPermError = true;
-      }
-    } catch (FileNotFoundException e) {
-      remoteExists = false;
-    } catch (IOException e) {
-      throw new YarnRuntimeException(
-          "Failed to check permissions for dir ["
-              + this.remoteRootLogDir + "]", e);
-    }
-    if (!remoteExists) {
-      LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
-          + "] does not exist. Attempting to create it.");
-      try {
-        Path qualified =
-            this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
-                remoteFS.getWorkingDirectory());
-        remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
-        remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
-
-        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-        String primaryGroupName = null;
-        try {
-          primaryGroupName = loginUser.getPrimaryGroupName();
-        } catch (IOException e) {
-          LOG.warn("No primary group found. The remote root log directory" +
-              " will be created with the HDFS superuser being its group " +
-              "owner. JobHistoryServer may be unable to read the directory.");
-        }
-        // set owner on the remote directory only if the primary group exists
-        if (primaryGroupName != null) {
-          remoteFS.setOwner(qualified,
-              loginUser.getShortUserName(), primaryGroupName);
-        }
-      } catch (IOException e) {
-        throw new YarnRuntimeException("Failed to create remoteLogDir ["
-            + this.remoteRootLogDir + "]", e);
-      }
-    }
-  }
-
-  Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
-    return LogAggregationUtils.getRemoteNodeLogFileForApp(
-        this.remoteRootLogDir, appId, user, this.nodeId,
-        this.remoteRootLogDirSuffix);
-  }
-
-  Path getRemoteAppLogDir(ApplicationId appId, String user) {
-    return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId,
-        user, this.remoteRootLogDirSuffix);
-  }
-
-  private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
-      throws IOException {
-    FsPermission dirPerm = new FsPermission(fsPerm);
-    fs.mkdirs(path, dirPerm);
-    FsPermission umask = FsPermission.getUMask(fs.getConf());
-    if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
-      fs.setPermission(path, new FsPermission(fsPerm));
-    }
-  }
-
-  private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
-      throws IOException {
-    boolean exists = true;
-    try {
-      FileStatus appDirStatus = fs.getFileStatus(path);
-      if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
-        fs.setPermission(path, APP_DIR_PERMISSIONS);
-      }
-    } catch (FileNotFoundException fnfe) {
-      exists = false;
-    }
-    return exists;
-  }
-
-  protected void createAppDir(final String user, final ApplicationId appId,
-      UserGroupInformation userUgi) {
-    try {
-      userUgi.doAs(new PrivilegedExceptionAction<Object>() {
-        @Override
-        public Object run() throws Exception {
-          try {
-            // TODO: Reuse FS for user?
-            FileSystem remoteFS = getFileSystem(getConfig());
-
-            // Only creating directories if they are missing to avoid
-            // unnecessary load on the filesystem from all of the nodes
-            Path appDir = LogAggregationUtils.getRemoteAppLogDir(
-                LogAggregationService.this.remoteRootLogDir, appId, user,
-                LogAggregationService.this.remoteRootLogDirSuffix);
-            appDir = appDir.makeQualified(remoteFS.getUri(),
-                remoteFS.getWorkingDirectory());
-
-            if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
-              Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
-                  LogAggregationService.this.remoteRootLogDir, user,
-                  LogAggregationService.this.remoteRootLogDirSuffix);
-              suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
-                  remoteFS.getWorkingDirectory());
-
-              if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
-                Path userDir = LogAggregationUtils.getRemoteLogUserDir(
-                    LogAggregationService.this.remoteRootLogDir, user);
-                userDir = userDir.makeQualified(remoteFS.getUri(),
-                    remoteFS.getWorkingDirectory());
-
-                if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
-                  createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
-                }
-
-                createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
-              }
-
-              createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
-            }
-
-          } catch (IOException e) {
-            LOG.error("Failed to setup application log directory for "
-                + appId, e);
-            throw e;
-          }
-          return null;
-        }
-      });
-    } catch (Exception e) {
-      throw new YarnRuntimeException(e);
-    }
-  }
-
   @SuppressWarnings("unchecked")
   private void initApp(final ApplicationId appId, String user,
       Credentials credentials, Map<ApplicationAccessType, String> appAcls,
@@ -377,7 +193,6 @@ public class LogAggregationService extends AbstractService implements
       long recoveredLogInitedTime) {
     ApplicationEvent eventResponse;
     try {
-      verifyAndCreateRemoteLogDir(getConfig());
       initAppAggregator(appId, user, credentials, appAcls,
           logAggregationContext, recoveredLogInitedTime);
       eventResponse = new ApplicationEvent(appId,
@@ -410,14 +225,17 @@ public class LogAggregationService extends AbstractService implements
       userUgi.addCredentials(credentials);
     }
 
+    LogAggregationFileController logAggregationFileController
+        = getLogAggregationFileController(getConfig());
+    logAggregationFileController.verifyAndCreateRemoteLogDir();
     // New application
     final AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
             getConfig(), appId, userUgi, this.nodeId, dirsHandler,
-            getRemoteNodeLogFileForApp(appId, user),
-            appAcls, logAggregationContext, this.context,
+            logAggregationFileController.getRemoteNodeLogFileForApp(appId,
+            user, nodeId), appAcls, logAggregationContext, this.context,
             getLocalFileContext(getConfig()), this.rollingMonitorInterval,
-            recoveredLogInitedTime);
+            recoveredLogInitedTime, logAggregationFileController);
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnRuntimeException("Duplicate initApp for " + appId);
     }
@@ -425,7 +243,7 @@ public class LogAggregationService extends AbstractService implements
     YarnRuntimeException appDirException = null;
     try {
       // Create the app dir
-      createAppDir(user, appId, userUgi);
+      logAggregationFileController.createAppDir(user, appId, userUgi);
     } catch (Exception e) {
       appLogAggregator.disableLogAggregation();
       if (!(e instanceof YarnRuntimeException)) {
@@ -570,4 +388,14 @@ public class LogAggregationService extends AbstractService implements
     }
     return threadPoolSize;
   }
+
+  @VisibleForTesting
+  public LogAggregationFileController getLogAggregationFileController(
+      Configuration conf) {
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(conf);
+    LogAggregationFileController logAggregationFileController = factory
+        .getFileControllerForWrite();
+    return logAggregationFileController;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index b4bd9d7..bedad33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -241,8 +241,8 @@ public class TestAppLogAggregatorImpl {
     // verify uploaded files
     ArgumentCaptor<LogValue> logValCaptor =
         ArgumentCaptor.forClass(LogValue.class);
-    verify(appLogAggregator.logWriter).append(any(LogKey.class),
-        logValCaptor.capture());
+    verify(appLogAggregator.getLogAggregationFileController()).write(
+        any(LogKey.class), logValCaptor.capture());
     Set<String> filesUploaded = new HashSet<>();
     LogValue logValue = logValCaptor.getValue();
     for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
@@ -287,11 +287,13 @@ public class TestAppLogAggregatorImpl {
     final Context context = createContext(config);
     final FileContext fakeLfs = mock(FileContext.class);
     final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
-
+    LogAggregationTFileController format = spy(
+        new LogAggregationTFileController());
+    format.initialize(config, "TFile");
     return new AppLogAggregatorInTest(dispatcher, deletionService,
         config, applicationId, ugi, nodeId, dirsService,
         remoteLogDirForApp, appAcls, logAggregationContext,
-        context, fakeLfs, recoveredLogInitedTimeMillis);
+        context, fakeLfs, recoveredLogInitedTimeMillis, format);
   }
 
   /**
@@ -402,7 +404,6 @@ public class TestAppLogAggregatorImpl {
 
     final DeletionService deletionService;
     final ApplicationId applicationId;
-    final LogWriter logWriter;
     final ArgumentCaptor<LogValue> logValue;
 
     public AppLogAggregatorInTest(Dispatcher dispatcher,
@@ -411,19 +412,15 @@ public class TestAppLogAggregatorImpl {
         LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
         Map<ApplicationAccessType, String> appAcls,
         LogAggregationContext logAggregationContext, Context context,
-        FileContext lfs, long recoveredLogInitedTime) throws IOException {
+        FileContext lfs, long recoveredLogInitedTime,
+        LogAggregationTFileController format) throws IOException {
       super(dispatcher, deletionService, conf, appId, ugi, nodeId,
           dirsHandler, remoteNodeLogFileForApp, appAcls,
-          logAggregationContext, context, lfs, -1, recoveredLogInitedTime);
+          logAggregationContext, context, lfs, -1, recoveredLogInitedTime,
+          format);
       this.applicationId = appId;
       this.deletionService = deletionService;
-      this.logWriter = spy(new LogWriter());
       this.logValue = ArgumentCaptor.forClass(LogValue.class);
     }
-
-    @Override
-    protected LogWriter createLogWriter() {
-      return this.logWriter;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 6383e83..0fa012c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -103,6 +103,9 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -161,11 +164,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   DrainDispatcher dispatcher;
   EventHandler<Event> appEventHandler;
 
+  private NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
+
   @Override
   @SuppressWarnings("unchecked")
   public void setup() throws IOException {
     super.setup();
-    NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
     ((NMContext)context).setNodeId(nodeId);
     dispatcher = createDispatcher();
     appEventHandler = mock(EventHandler.class);
@@ -246,9 +250,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
       app1LogDir.exists());
 
-    Path logFilePath =
-        logAggregationService.getRemoteNodeLogFileForApp(application1,
-            this.user);
+    Path logFilePath = logAggregationService
+        .getLogAggregationFileController(conf)
+        .getRemoteNodeLogFileForApp(application1, this.user, nodeId);
 
     Assert.assertTrue("Log file [" + logFilePath + "] not found", new File(
         logFilePath.toUri().getPath()).exists());
@@ -369,9 +373,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
-
-    Assert.assertFalse(new File(logAggregationService
-        .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
+    LogAggregationFileController format1 =
+        logAggregationService.getLogAggregationFileController(conf);
+    Assert.assertFalse(new File(format1.getRemoteNodeLogFileForApp(
+        application1, this.user, this.nodeId).toUri().getPath())
         .exists());
 
     dispatcher.await();
@@ -541,26 +546,33 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     };
     checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
   }
-  
+
   @Test
   public void testVerifyAndCreateRemoteDirsFailure()
       throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-    
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(conf);
+    LogAggregationFileController logAggregationFileFormat = factory
+        .getFileControllerForWrite();
+    LogAggregationFileController spyLogAggregationFileFormat =
+        spy(logAggregationFileFormat);
+    YarnRuntimeException e = new YarnRuntimeException("KABOOM!");
+    doThrow(e).doNothing().when(spyLogAggregationFileFormat)
+        .verifyAndCreateRemoteLogDir();
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
-                                  super.dirsHandler));
+            super.dirsHandler) {
+        @Override
+        public LogAggregationFileController getLogAggregationFileController(
+            Configuration conf) {
+          return spyLogAggregationFileFormat;
+        }
+      });
     logAggregationService.init(this.conf);
-    
-    YarnRuntimeException e = new YarnRuntimeException("KABOOM!");
-    doThrow(e)
-      .when(logAggregationService).verifyAndCreateRemoteLogDir(
-          any(Configuration.class));
-        
     logAggregationService.start();
-    
     // Now try to start an application
     ApplicationId appId =
         BuilderUtils.newApplicationId(System.currentTimeMillis(),
@@ -607,8 +619,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     
     logAggregationService.stop();
   }
-  
-  
+
   @Test
   public void testVerifyAndCreateRemoteDirNonExistence()
       throws Exception {
@@ -621,14 +632,24 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
     logAggregationService.init(this.conf);
+    logAggregationService.start();
     boolean existsBefore = aNewFile.exists();
     assertTrue("The new file already exists!", !existsBefore);
 
-    logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
-    
+    ApplicationId appId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null, this.acls, contextWithAMAndFailed));
+    dispatcher.await();
+
     boolean existsAfter = aNewFile.exists();
     assertTrue("The new aggregate file is not successfully created", existsAfter);
     aNewFile.delete(); //housekeeping
+    logAggregationService.stop();
   }
 
   @Test
@@ -641,7 +662,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     LogAggregationService logAggregationService = new LogAggregationService(
         dispatcher, this.context, this.delSrvc, super.dirsHandler);
     logAggregationService.init(this.conf);
-    logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
+    logAggregationService.start();
+
+    ApplicationId appId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null, this.acls, contextWithAMAndFailed));
+    dispatcher.await();
 
     String targetGroup =
         UserGroupInformation.getLoginUser().getPrimaryGroupName();
@@ -651,6 +682,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         fileStatus.getGroup(), targetGroup);
 
     fs.delete(aNewFile, true);
+    logAggregationService.stop();
   }
 
   @Test
@@ -669,14 +701,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     FileSystem fs = FileSystem.get(this.conf);
     final FileSystem spyFs = spy(FileSystem.get(this.conf));
 
+    final LogAggregationTFileController spyFileFormat
+        = new LogAggregationTFileController() {
+          @Override
+          public FileSystem getFileSystem(Configuration conf)
+              throws IOException {
+            return spyFs;
+          }
+        };
+    spyFileFormat.initialize(conf, "TFile");
     LogAggregationService aggSvc = new LogAggregationService(dispatcher,
         this.context, this.delSrvc, super.dirsHandler) {
       @Override
-      protected FileSystem getFileSystem(Configuration conf) {
-        return spyFs;
+      public LogAggregationFileController getLogAggregationFileController(
+          Configuration conf) {
+        return spyFileFormat;
       }
     };
-
     aggSvc.init(this.conf);
     aggSvc.start();
 
@@ -769,18 +810,36 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   @Test
   public void testLogAggregationCreateDirsFailsWithoutKillingNM()
       throws Exception {
-    
-    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+        localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
 
     DeletionService spyDelSrvc = spy(this.delSrvc);
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(conf);
+    LogAggregationFileController logAggregationFileFormat = factory
+        .getFileControllerForWrite();
+    LogAggregationFileController spyLogAggregationFileFormat =
+        spy(logAggregationFileFormat);
+    Exception e = new RuntimeException("KABOOM!");
+    doThrow(e).when(spyLogAggregationFileFormat)
+        .createAppDir(any(String.class), any(ApplicationId.class),
+            any(UserGroupInformation.class));
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, spyDelSrvc,
-                                  super.dirsHandler));
+            super.dirsHandler){
+        @Override
+        public LogAggregationFileController getLogAggregationFileController(
+            Configuration conf) {
+          return spyLogAggregationFileFormat;
+        }
+      });
+
     logAggregationService.init(this.conf);
     logAggregationService.start();
-    
+
     ApplicationId appId =
         BuilderUtils.newApplicationId(System.currentTimeMillis(),
           (int) (Math.random() * 1000));
@@ -789,10 +848,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         new File(localLogDir, appId.toString());
     appLogDir.mkdir();
 
-    Exception e = new RuntimeException("KABOOM!");
-    doThrow(e)
-      .when(logAggregationService).createAppDir(any(String.class),
-          any(ApplicationId.class), any(UserGroupInformation.class));
     LogAggregationContext contextWithAMAndFailed =
         Records.newRecord(LogAggregationContext.class);
     contextWithAMAndFailed.setLogAggregationPolicyClassName(
@@ -867,7 +922,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
       int minNumOfContainers, int maxNumOfContainers,
       String[] logFiles, int numOfLogsPerContainer, boolean multiLogs)
     throws IOException {
-    Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
+    Path appLogDir = logAggregationService.getLogAggregationFileController(
+        conf).getRemoteAppLogDir(appId, this.user);
     RemoteIterator<FileStatus> nodeFiles = null;
     try {
       Path qualifiedLogDir =
@@ -2108,7 +2164,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     }
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
         this.user, null, this.acls, logAggContext));
-
+    dispatcher.await();
     return logAggregationService;
   }
 
@@ -2462,17 +2518,20 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     logAggregationService.stop();
 
     assertEquals(expectedLogAggregationTimes,
-        aggregator.getLogAggregationTimes());
+        aggregator.getLogAggregationFileControllerContext()
+        .getLogAggregationTimes());
     assertEquals(expectedAggregationReportNum,
         this.context.getLogAggregationStatusForApps().size());
     assertEquals(expectedCleanupOldLogsTimes,
-        aggregator.getCleanupOldLogTimes());
+        aggregator.getLogAggregationFileControllerContext()
+        .getCleanOldLogsTimes());
   }
 
   private int numOfLogsAvailable(LogAggregationService logAggregationService,
       ApplicationId appId, boolean sizeLimited, String lastLogFile)
       throws IOException {
-    Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
+    Path appLogDir = logAggregationService.getLogAggregationFileController(
+        conf).getRemoteAppLogDir(appId, this.user);
     RemoteIterator<FileStatus> nodeFiles = null;
     try {
       Path qualifiedLogDir =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.

Posted by ju...@apache.org.
YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c2cb7ea1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c2cb7ea1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c2cb7ea1

Branch: refs/heads/trunk
Commit: c2cb7ea1ef6532020b69031dbd18b0f9b8369f0f
Parents: 8196a07
Author: Junping Du <ju...@apache.org>
Authored: Thu Aug 24 13:36:49 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Aug 24 13:36:49 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  12 +-
 .../yarn/conf/TestYarnConfigurationFields.java  |   2 +
 .../hadoop/yarn/client/cli/TestLogsCLI.java     |  65 +--
 .../logaggregation/AggregatedLogFormat.java     |  22 +-
 .../logaggregation/LogAggregationUtils.java     |  41 ++
 .../LogAggregationFileController.java           | 404 +++++++++++++++++++
 .../LogAggregationFileControllerContext.java    | 130 ++++++
 .../LogAggregationFileControllerFactory.java    | 195 +++++++++
 .../LogAggregationTFileController.java          | 127 ++++++
 .../filecontroller/package-info.java            |  21 +
 .../src/main/resources/yarn-default.xml         |  19 +
 .../logaggregation/TestAggregatedLogsBlock.java |  28 +-
 .../logaggregation/TestContainerLogsUtils.java  |  29 +-
 ...TestLogAggregationFileControllerFactory.java | 171 ++++++++
 .../logaggregation/AppLogAggregatorImpl.java    | 232 ++++-------
 .../logaggregation/LogAggregationService.java   | 210 +---------
 .../TestAppLogAggregatorImpl.java               |  25 +-
 .../TestLogAggregationService.java              | 135 +++++--
 18 files changed, 1419 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 86f45b8..16bd73a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1064,7 +1064,17 @@ public class YarnConfiguration extends Configuration {
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
       + "log-aggregation-enable";
   public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
-  
+
+  public static final String LOG_AGGREGATION_FILE_FORMATS = YARN_PREFIX
+      + "log-aggregation.file-formats";
+  public static final String LOG_AGGREGATION_FILE_CONTROLLER_FMT =
+      YARN_PREFIX + "log-aggregation.file-controller.%s.class";
+
+  public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT
+      = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir";
+  public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT
+      = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir-suffix";
+
   /** 
    * How long to wait before deleting aggregated logs, -1 disables.
    * Be careful set this too small and you will spam the name node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index c40c2c5..153a35a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -184,6 +184,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
     // Currently defined in RegistryConstants/core-site.xml
     xmlPrefixToSkipCompare.add("hadoop.registry");
 
+    xmlPrefixToSkipCompare.add(
+        "yarn.log-aggregation.file-controller.TFile.class");
     // Add the filters used for checking for collision of default values.
     initDefaultValueCollisionCheck();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index c054209..26e0319 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -36,7 +36,6 @@ import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.ClientResponse.Status;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileWriter;
@@ -78,6 +77,9 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
@@ -1345,42 +1347,55 @@ public class TestLogsCLI {
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
             + System.currentTimeMillis());
-    try (AggregatedLogFormat.LogWriter writer =
-             new AggregatedLogFormat.LogWriter()) {
-      writer.initialize(configuration, path, ugi);
-      writer.writeApplicationOwner(ugi.getUserName());
-
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileFormat = factory
+        .getFileControllerForWrite();
+    try {
       Map<ApplicationAccessType, String> appAcls = new HashMap<>();
       appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-      writer.writeApplicationACLs(appAcls);
-      writer.append(new AggregatedLogFormat.LogKey(containerId),
+      LogAggregationFileControllerContext context
+          = new LogAggregationFileControllerContext(
+              path, path, true, 1000,
+              containerId.getApplicationAttemptId().getApplicationId(),
+              appAcls, nodeId, ugi);
+      fileFormat.initializeWriter(context);
+      fileFormat.write(new AggregatedLogFormat.LogKey(containerId),
           new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
               UserGroupInformation.getCurrentUser().getShortUserName()));
+    } finally {
+      fileFormat.closeWriter();
     }
   }
 
+  @SuppressWarnings("static-access")
   private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
       Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
       ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
-    Path path =
-        new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
-            + System.currentTimeMillis());
-    try (AggregatedLogFormat.LogWriter writer =
-             new AggregatedLogFormat.LogWriter()) {
-      writer.initialize(configuration, path, ugi);
-      writer.writeApplicationOwner(ugi.getUserName());
-
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileFormat = factory
+        .getFileControllerForWrite();
+    try {
       Map<ApplicationAccessType, String> appAcls = new HashMap<>();
       appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-      writer.writeApplicationACLs(appAcls);
-      DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
-      new AggregatedLogFormat.LogKey(containerId).write(out);
-      out.close();
-      out = writer.getWriter().prepareAppendValue(-1);
-      new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
-          UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
-              new HashSet<>());
-      out.close();
+      ApplicationId appId = containerId.getApplicationAttemptId()
+          .getApplicationId();
+      Path path = fileFormat.getRemoteNodeLogFileForApp(
+          appId, ugi.getCurrentUser().getShortUserName(), nodeId);
+      LogAggregationFileControllerContext context
+          = new LogAggregationFileControllerContext(
+              path, path, true, 1000,
+              appId, appAcls, nodeId, ugi);
+      fileFormat.initializeWriter(context);
+      AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(
+          containerId);
+      AggregatedLogFormat.LogValue value = new AggregatedLogFormat.LogValue(
+          rootLogDirs, containerId, UserGroupInformation.getCurrentUser()
+              .getShortUserName());
+      fileFormat.write(key, value);
+    } finally {
+      fileFormat.closeWriter();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index d806b12..3c1dcdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -44,7 +44,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.regex.Pattern;
-
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.commons.io.output.WriterOutputStream;
 import org.apache.commons.logging.Log;
@@ -61,7 +61,6 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.file.tfile.TFile;
@@ -71,7 +70,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Times;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -249,7 +247,7 @@ public class AggregatedLogFormat {
           in = secureOpenFile(logFile);
         } catch (IOException e) {
           logErrorMessage(logFile, e);
-          IOUtils.cleanup(LOG, in);
+          IOUtils.closeQuietly(in);
           continue;
         }
 
@@ -287,7 +285,7 @@ public class AggregatedLogFormat {
           String message = logErrorMessage(logFile, e);
           out.write(message.getBytes(Charset.forName("UTF-8")));
         } finally {
-          IOUtils.cleanup(LOG, in);
+          IOUtils.closeQuietly(in);
         }
       }
     }
@@ -557,7 +555,7 @@ public class AggregatedLogFormat {
       } catch (Exception e) {
         LOG.warn("Exception closing writer", e);
       } finally {
-        IOUtils.closeStream(this.fsDataOStream);
+        IOUtils.closeQuietly(this.fsDataOStream);
       }
     }
   }
@@ -605,7 +603,7 @@ public class AggregatedLogFormat {
         }
         return null;
       } finally {
-        IOUtils.cleanup(LOG, ownerScanner);
+        IOUtils.closeQuietly(ownerScanner);
       }
     }
 
@@ -651,7 +649,7 @@ public class AggregatedLogFormat {
         }
         return acls;
       } finally {
-        IOUtils.cleanup(LOG, aclScanner);
+        IOUtils.closeQuietly(aclScanner);
       }
     }
 
@@ -775,8 +773,8 @@ public class AggregatedLogFormat {
           }
         }
       } finally {
-        IOUtils.cleanup(LOG, ps);
-        IOUtils.cleanup(LOG, os);
+        IOUtils.closeQuietly(ps);
+        IOUtils.closeQuietly(os);
       }
     }
 
@@ -1001,7 +999,9 @@ public class AggregatedLogFormat {
     }
 
     public void close() {
-      IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
+      IOUtils.closeQuietly(scanner);
+      IOUtils.closeQuietly(reader);
+      IOUtils.closeQuietly(fsDataIStream);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 24baaab..e8a28de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -133,6 +133,23 @@ public class LogAggregationUtils {
         new org.apache.hadoop.fs.Path(conf.get(
             YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    return getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix);
+  }
+
+  /**
+   * Return the remote application log directory.
+   * @param conf the configuration
+   * @param appId the application
+   * @param appOwner the application owner
+   * @param remoteRootLogDir the remote root log directory
+   * @param suffix the log directory suffix
+   * @return the remote application log directory path
+   * @throws IOException if we can not find remote application log directory
+   */
+  public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
+      Configuration conf, ApplicationId appId, String appOwner,
+      org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+      throws IOException {
     org.apache.hadoop.fs.Path remoteAppDir = null;
     if (appOwner == null) {
       org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
@@ -159,6 +176,30 @@ public class LogAggregationUtils {
    * @param conf the configuration
    * @param appId the applicationId
    * @param appOwner the application owner
+   * @param remoteRootLogDir the remote root log directory
+   * @param suffix the log directory suffix
+   * @return the iterator of available log files
+   * @throws IOException if there is no log file available
+   */
+  public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
+      Configuration conf, ApplicationId appId, String appOwner,
+      org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+      throws IOException {
+    Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
+        remoteRootLogDir, suffix);
+    RemoteIterator<FileStatus> nodeFiles = null;
+    Path qualifiedLogDir =
+        FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
+    nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
+        conf).listStatus(remoteAppLogDir);
+    return nodeFiles;
+  }
+
+  /**
+   * Get all available log files under remote app log directory.
+   * @param conf the configuration
+   * @param appId the applicationId
+   * @param appOwner the application owner
    * @return the iterator of available log files
    * @throws IOException if there is no log file available
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
new file mode 100644
index 0000000..5503f8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+
+/**
+ * Base class to implement Log Aggregation File Controller.
+ */
+@Private
+@Unstable
+public abstract class LogAggregationFileController {
+
+  private static final Log LOG = LogFactory.getLog(
+      LogAggregationFileController.class);
+
+  /*
+   * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
+   * Group to which NMOwner belongs> App dirs will be created as 770,
+   * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
+   * access / modify the files.
+   * <NMGroup> should obviously be a limited access group.
+   */
+  /**
+   * Permissions for the top level directory under which app directories will be
+   * created.
+   */
+  protected static final FsPermission TLDIR_PERMISSIONS = FsPermission
+      .createImmutable((short) 01777);
+
+  /**
+   * Permissions for the Application directory.
+   */
+  protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
+      .createImmutable((short) 0770);
+
+  // This is temporary solution. The configuration will be deleted once
+  // we find a more scalable method to only write a single log file per LRS.
+  private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
+      = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
+  private static final int
+      DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
+
+  protected Configuration conf;
+  protected Path remoteRootLogDir;
+  protected String remoteRootLogDirSuffix;
+  protected int retentionSize;
+  protected String fileControllerName;
+
+  public LogAggregationFileController() {}
+
+  /**
+   * Initialize the log file controller.
+   * @param conf the Configuration
+   * @param controllerName the log controller class name
+   */
+  public void initialize(Configuration conf, String controllerName) {
+    this.conf = conf;
+    int configuredRentionSize =
+        conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
+            DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
+    if (configuredRentionSize <= 0) {
+      this.retentionSize =
+        DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
+    } else {
+      this.retentionSize = configuredRentionSize;
+    }
+    this.fileControllerName = controllerName;
+    initInternal(conf);
+  }
+
+  /**
+   * Derived classes initialize themselves using this method.
+   * @param conf the Configuration
+   */
+  protected abstract void initInternal(Configuration conf);
+
+  /**
+   * Get the remote root log directory.
+   * @return the remote root log directory path
+   */
+  public Path getRemoteRootLogDir() {
+    return this.remoteRootLogDir;
+  }
+
+  /**
+   * Get the log aggregation directory suffix.
+   * @return the log aggregation directory suffix
+   */
+  public String getRemoteRootLogDirSuffix() {
+    return this.remoteRootLogDirSuffix;
+  }
+
+  /**
+   * Initialize the writer.
+   * @param context the {@link LogAggregationFileControllerContext}
+   * @throws IOException if fails to initialize the writer
+   */
+  public abstract void initializeWriter(
+      LogAggregationFileControllerContext context) throws IOException;
+
+  /**
+   * Close the writer.
+   */
+  public abstract void closeWriter();
+
+  /**
+   * Write the log content.
+   * @param logKey the log key
+   * @param logValue the log content
+   * @throws IOException if fails to write the logs
+   */
+  public abstract void write(LogKey logKey, LogValue logValue)
+      throws IOException;
+
+  /**
+   * Operations needed after write the log content.
+   * @param record the {@link LogAggregationFileControllerContext}
+   * @throws Exception if anything fails
+   */
+  public abstract void postWrite(LogAggregationFileControllerContext record)
+      throws Exception;
+
+  /**
+   * Verify and create the remote log directory.
+   */
+  public void verifyAndCreateRemoteLogDir() {
+    boolean logPermError = true;
+    // Checking the existence of the TLD
+    FileSystem remoteFS = null;
+    try {
+      remoteFS = getFileSystem(conf);
+    } catch (IOException e) {
+      throw new YarnRuntimeException(
+          "Unable to get Remote FileSystem instance", e);
+    }
+    boolean remoteExists = true;
+    Path remoteRootLogDir = getRemoteRootLogDir();
+    try {
+      FsPermission perms =
+          remoteFS.getFileStatus(remoteRootLogDir).getPermission();
+      if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
+        LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+            + "] already exist, but with incorrect permissions. "
+            + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+            + "]." + " The cluster may have problems with multiple users.");
+        logPermError = false;
+      } else {
+        logPermError = true;
+      }
+    } catch (FileNotFoundException e) {
+      remoteExists = false;
+    } catch (IOException e) {
+      throw new YarnRuntimeException(
+          "Failed to check permissions for dir ["
+              + remoteRootLogDir + "]", e);
+    }
+    if (!remoteExists) {
+      LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+          + "] does not exist. Attempting to create it.");
+      try {
+        Path qualified =
+            remoteRootLogDir.makeQualified(remoteFS.getUri(),
+                remoteFS.getWorkingDirectory());
+        remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
+        remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
+
+        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+        String primaryGroupName = null;
+        try {
+          primaryGroupName = loginUser.getPrimaryGroupName();
+        } catch (IOException e) {
+          LOG.warn("No primary group found. The remote root log directory" +
+              " will be created with the HDFS superuser being its group " +
+              "owner. JobHistoryServer may be unable to read the directory.");
+        }
+        // set owner on the remote directory only if the primary group exists
+        if (primaryGroupName != null) {
+          remoteFS.setOwner(qualified,
+              loginUser.getShortUserName(), primaryGroupName);
+        }
+      } catch (IOException e) {
+        throw new YarnRuntimeException("Failed to create remoteLogDir ["
+            + remoteRootLogDir + "]", e);
+      }
+    }
+  }
+
+  /**
+   * Create remote Application directory for log aggregation.
+   * @param user the user
+   * @param appId the application ID
+   * @param userUgi the UGI
+   */
+  public void createAppDir(final String user, final ApplicationId appId,
+      UserGroupInformation userUgi) {
+    final Path remoteRootLogDir = getRemoteRootLogDir();
+    final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix();
+    try {
+      userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          try {
+            // TODO: Reuse FS for user?
+            FileSystem remoteFS = getFileSystem(conf);
+
+            // Only creating directories if they are missing to avoid
+            // unnecessary load on the filesystem from all of the nodes
+            Path appDir = LogAggregationUtils.getRemoteAppLogDir(
+                remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
+
+            appDir = appDir.makeQualified(remoteFS.getUri(),
+                remoteFS.getWorkingDirectory());
+
+            if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
+              Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
+                  remoteRootLogDir, user, remoteRootLogDirSuffix);
+              suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
+                  remoteFS.getWorkingDirectory());
+
+              if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
+                Path userDir = LogAggregationUtils.getRemoteLogUserDir(
+                    remoteRootLogDir, user);
+                userDir = userDir.makeQualified(remoteFS.getUri(),
+                    remoteFS.getWorkingDirectory());
+
+                if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
+                  createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+                }
+
+                createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+              }
+
+              createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+            }
+
+          } catch (IOException e) {
+            LOG.error("Failed to setup application log directory for "
+                + appId, e);
+            throw e;
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return getRemoteRootLogDir().getFileSystem(conf);
+  }
+
+  protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
+      throws IOException {
+    FsPermission dirPerm = new FsPermission(fsPerm);
+    fs.mkdirs(path, dirPerm);
+    FsPermission umask = FsPermission.getUMask(fs.getConf());
+    if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
+      fs.setPermission(path, new FsPermission(fsPerm));
+    }
+  }
+
+  protected boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
+      throws IOException {
+    boolean exists = true;
+    try {
+      FileStatus appDirStatus = fs.getFileStatus(path);
+      if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
+        fs.setPermission(path, APP_DIR_PERMISSIONS);
+      }
+    } catch (FileNotFoundException fnfe) {
+      exists = false;
+    }
+    return exists;
+  }
+
+  /**
+   * Get the remote aggregated log path.
+   * @param appId the ApplicationId
+   * @param user the Application Owner
+   * @param nodeId the NodeManager Id
+   * @return the remote aggregated log path
+   */
+  public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
+      NodeId nodeId) {
+    return LogAggregationUtils.getRemoteNodeLogFileForApp(
+        getRemoteRootLogDir(), appId, user, nodeId,
+        getRemoteRootLogDirSuffix());
+  }
+
+  /**
+   * Get the remote application directory for log aggregation.
+   * @param appId the Application ID
+   * @param appOwner the Application Owner
+   * @return the remote application directory
+   * @throws IOException if can not find the remote application directory
+   */
+  public Path getRemoteAppLogDir(ApplicationId appId, String appOwner)
+      throws IOException {
+    return LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner,
+        this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+  }
+
+  protected void cleanOldLogs(Path remoteNodeLogFileForApp,
+      final NodeId nodeId, UserGroupInformation userUgi) {
+    try {
+      final FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
+      Path appDir = remoteNodeLogFileForApp.getParent().makeQualified(
+          remoteFS.getUri(), remoteFS.getWorkingDirectory());
+      Set<FileStatus> status =
+          new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
+
+      Iterable<FileStatus> mask =
+          Iterables.filter(status, new Predicate<FileStatus>() {
+            @Override
+            public boolean apply(FileStatus next) {
+              return next.getPath().getName()
+                .contains(LogAggregationUtils.getNodeString(nodeId))
+                && !next.getPath().getName().endsWith(
+                    LogAggregationUtils.TMP_FILE_SUFFIX);
+            }
+          });
+      status = Sets.newHashSet(mask);
+      // Normally, we just need to delete one oldest log
+      // before we upload a new log.
+      // If we can not delete the older logs in this cycle,
+      // we will delete them in next cycle.
+      if (status.size() >= this.retentionSize) {
+        // sort by the lastModificationTime ascending
+        List<FileStatus> statusList = new ArrayList<FileStatus>(status);
+        Collections.sort(statusList, new Comparator<FileStatus>() {
+          public int compare(FileStatus s1, FileStatus s2) {
+            return s1.getModificationTime() < s2.getModificationTime() ? -1
+                : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
+          }
+        });
+        for (int i = 0; i <= statusList.size() - this.retentionSize; i++) {
+          final FileStatus remove = statusList.get(i);
+          try {
+            userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                remoteFS.delete(remove.getPath(), false);
+                return null;
+              }
+            });
+          } catch (Exception e) {
+            LOG.error("Failed to delete " + remove.getPath(), e);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to clean old logs", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
new file mode 100644
index 0000000..32128bc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
@@ -0,0 +1,130 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * {@code LogAggregationFileControllerContext} is a record used in
+ * the log aggregation process.
+ */
+@Private
+@Unstable
+public class LogAggregationFileControllerContext {
+  private final boolean logAggregationInRolling;
+  private final long rollingMonitorInterval;
+  private final Path remoteNodeLogFileForApp;
+  private final NodeId nodeId;
+  private final UserGroupInformation userUgi;
+  private final ApplicationId appId;
+  private final Path remoteNodeTmpLogFileForApp;
+  private final Map<ApplicationAccessType, String> appAcls;
+  private int logAggregationTimes = 0;
+  private int cleanOldLogsTimes = 0;
+
+  private boolean uploadedLogsInThisCycle;
+  private long logUploadedTimeStamp;
+
+  public LogAggregationFileControllerContext(Path remoteNodeLogFileForApp,
+      Path remoteNodeTmpLogFileForApp,
+      boolean logAggregationInRolling,
+      long rollingMonitorInterval,
+      ApplicationId appId,
+      Map<ApplicationAccessType, String> appAcls,
+      NodeId nodeId, UserGroupInformation userUgi) {
+    this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+    this.remoteNodeTmpLogFileForApp = remoteNodeTmpLogFileForApp;
+    this.logAggregationInRolling = logAggregationInRolling;
+    this.rollingMonitorInterval = rollingMonitorInterval;
+    this.nodeId = nodeId;
+    this.appId = appId;
+    this.appAcls = appAcls;
+    this.userUgi = userUgi;
+  }
+
+  public boolean isUploadedLogsInThisCycle() {
+    return uploadedLogsInThisCycle;
+  }
+
+  public void setUploadedLogsInThisCycle(boolean uploadedLogsInThisCycle) {
+    this.uploadedLogsInThisCycle = uploadedLogsInThisCycle;
+  }
+
+  public Path getRemoteNodeLogFileForApp() {
+    return remoteNodeLogFileForApp;
+  }
+
+  public long getRollingMonitorInterval() {
+    return rollingMonitorInterval;
+  }
+
+  public boolean isLogAggregationInRolling() {
+    return logAggregationInRolling;
+  }
+
+  public long getLogUploadTimeStamp() {
+    return logUploadedTimeStamp;
+  }
+
+  public void setLogUploadTimeStamp(long uploadTimeStamp) {
+    this.logUploadedTimeStamp = uploadTimeStamp;
+  }
+
+  public NodeId getNodeId() {
+    return nodeId;
+  }
+
+  public UserGroupInformation getUserUgi() {
+    return userUgi;
+  }
+
+  public ApplicationId getAppId() {
+    return appId;
+  }
+
+  public Path getRemoteNodeTmpLogFileForApp() {
+    return remoteNodeTmpLogFileForApp;
+  }
+
+  public void increLogAggregationTimes() {
+    this.logAggregationTimes++;
+  }
+
+  public void increcleanupOldLogTimes() {
+    this.cleanOldLogsTimes++;
+  }
+
+  public int getLogAggregationTimes() {
+    return logAggregationTimes;
+  }
+
+  public int getCleanOldLogsTimes() {
+    return cleanOldLogsTimes;
+  }
+
+  public Map<ApplicationAccessType, String> getAppAcls() {
+    return appAcls;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
new file mode 100644
index 0000000..746bf5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Use {@code LogAggregationFileControllerFactory} to get the correct
+ * {@link LogAggregationFileController} for write and read.
+ *
+ */
+@Private
+@Unstable
+public class LogAggregationFileControllerFactory {
+
+  private static final Log LOG = LogFactory.getLog(
+      LogAggregationFileControllerFactory.class);
+  private final Pattern p = Pattern.compile(
+      "^[A-Za-z_]+[A-Za-z0-9_]*$");
+  private LinkedList<LogAggregationFileController> controllers
+      = new LinkedList<>();
+  private Configuration conf;
+
+  /**
+   * Construct the LogAggregationFileControllerFactory object.
+   * @param conf the Configuration
+   */
+  public LogAggregationFileControllerFactory(Configuration conf) {
+    this.conf = conf;
+    Collection<String> fileControllers = conf.getStringCollection(
+        YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
+    List<String> controllerClassName = new ArrayList<>();
+
+    Map<String, String> controllerChecker = new HashMap<>();
+
+    for (String fileController : fileControllers) {
+      Preconditions.checkArgument(validateAggregatedFileControllerName(
+          fileController), "The FileControllerName: " + fileController
+          + " set in " + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS
+          +" is invalid." + "The valid File Controller name should only "
+          + "contain a-zA-Z0-9_ and can not start with numbers");
+
+      String remoteDirStr = String.format(
+          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+          fileController);
+      String remoteDir = conf.get(remoteDirStr);
+      boolean defaultRemoteDir = false;
+      if (remoteDir == null || remoteDir.isEmpty()) {
+        remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
+        defaultRemoteDir = true;
+      }
+      String suffixStr = String.format(
+          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+          fileController);
+      String suffix = conf.get(suffixStr);
+      boolean defaultSuffix = false;
+      if (suffix == null || suffix.isEmpty()) {
+        suffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+        defaultSuffix = true;
+      }
+      String dirSuffix = remoteDir + "-" + suffix;
+      if (controllerChecker.containsKey(dirSuffix)) {
+        if (defaultRemoteDir && defaultSuffix) {
+          String fileControllerStr = controllerChecker.get(dirSuffix);
+          List<String> controllersList = new ArrayList<>();
+          controllersList.add(fileControllerStr);
+          controllersList.add(fileController);
+          fileControllerStr = StringUtils.join(controllersList, ",");
+          controllerChecker.put(dirSuffix, fileControllerStr);
+        } else {
+          String conflictController = controllerChecker.get(dirSuffix);
+          throw new RuntimeException("The combined value of " + remoteDirStr
+              + " and " + suffixStr + " should not be the same as the value"
+              + " set for " + conflictController);
+        }
+      } else {
+        controllerChecker.put(dirSuffix, fileController);
+      }
+      String classKey = String.format(
+          YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT,
+          fileController);
+      String className = conf.get(classKey);
+      if (className == null || className.isEmpty()) {
+        throw new RuntimeException("No class configured for "
+            + fileController);
+      }
+      controllerClassName.add(className);
+      Class<? extends LogAggregationFileController> sClass = conf.getClass(
+          classKey, null, LogAggregationFileController.class);
+      if (sClass == null) {
+        throw new RuntimeException("No class defined for " + fileController);
+      }
+      LogAggregationFileController s = ReflectionUtils.newInstance(
+          sClass, conf);
+      if (s == null) {
+        throw new RuntimeException("No object created for "
+            + controllerClassName);
+      }
+      s.initialize(conf, fileController);
+      controllers.add(s);
+    }
+  }
+
+  /**
+   * Get {@link LogAggregationFileController} to write.
+   * @return the LogAggregationFileController instance
+   */
+  public LogAggregationFileController getFileControllerForWrite() {
+    return controllers.getFirst();
+  }
+
+  /**
+   * Get {@link LogAggregationFileController} to read the aggregated logs
+   * for this application.
+   * @param appId the ApplicationId
+   * @param appOwner the Application Owner
+   * @return the LogAggregationFileController instance
+   * @throws IOException if can not find any log aggregation file controller
+   */
+  public LogAggregationFileController getFileControllerForRead(
+      ApplicationId appId, String appOwner) throws IOException {
+    StringBuilder diagnosis = new StringBuilder();
+    for(LogAggregationFileController fileController : controllers) {
+      try {
+        Path remoteAppLogDir = fileController.getRemoteAppLogDir(
+            appId, appOwner);
+        Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(
+            remoteAppLogDir);
+        RemoteIterator<FileStatus> nodeFiles = FileContext.getFileContext(
+            qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
+        if (nodeFiles.hasNext()) {
+          return fileController;
+        }
+      } catch (Exception ex) {
+        diagnosis.append(ex.getMessage() + "\n");
+        continue;
+      }
+    }
+    throw new IOException(diagnosis.toString());
+  }
+
+  private boolean validateAggregatedFileControllerName(String name) {
+    if (name == null || name.trim().isEmpty()) {
+      return false;
+    }
+    return p.matcher(name).matches();
+  }
+
+  @Private
+  @VisibleForTesting
+  public LinkedList<LogAggregationFileController>
+      getConfiguredLogAggregationFileControllerList() {
+    return this.controllers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
new file mode 100644
index 0000000..9e0c66d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.Times;
+
+/**
+ * The TFile log aggregation file Controller implementation.
+ */
+@Private
+@Unstable
+public class LogAggregationTFileController
+    extends LogAggregationFileController {
+
+  private static final Log LOG = LogFactory.getLog(
+      LogAggregationTFileController.class);
+
+  private LogWriter writer;
+
+  public LogAggregationTFileController(){}
+
+  @Override
+  public void initInternal(Configuration conf) {
+    this.remoteRootLogDir = new Path(
+        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    this.remoteRootLogDirSuffix =
+        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+  }
+
+  @Override
+  public void initializeWriter(LogAggregationFileControllerContext context)
+      throws IOException {
+    this.writer = new LogWriter();
+    writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
+        context.getUserUgi());
+    // Write ACLs once when the writer is created.
+    writer.writeApplicationACLs(context.getAppAcls());
+    writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
+  }
+
+  @Override
+  public void closeWriter() {
+    this.writer.close();
+  }
+
+  @Override
+  public void write(LogKey logKey, LogValue logValue) throws IOException {
+    this.writer.append(logKey, logValue);
+  }
+
+  @Override
+  public void postWrite(final LogAggregationFileControllerContext record)
+      throws Exception {
+    // Before upload logs, make sure the number of existing logs
+    // is smaller than the configured NM log aggregation retention size.
+    if (record.isUploadedLogsInThisCycle() &&
+        record.isLogAggregationInRolling()) {
+      cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
+          record.getUserUgi());
+      record.increcleanupOldLogTimes();
+    }
+
+    final Path renamedPath = record.getRollingMonitorInterval() <= 0
+        ? record.getRemoteNodeLogFileForApp() : new Path(
+            record.getRemoteNodeLogFileForApp().getParent(),
+            record.getRemoteNodeLogFileForApp().getName() + "_"
+            + record.getLogUploadTimeStamp());
+    final boolean rename = record.isUploadedLogsInThisCycle();
+    try {
+      record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
+              .getFileSystem(conf);
+          if (rename) {
+            remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
+                renamedPath);
+          } else {
+            remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error(
+          "Failed to move temporary log file to final location: ["
+          + record.getRemoteNodeTmpLogFileForApp() + "] to ["
+          + renamedPath + "]", e);
+      throw new Exception("Log uploaded failed for Application: "
+          + record.getAppId() + " in NodeManager: "
+          + LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+          + Times.format(record.getLogUploadTimeStamp()) + "\n");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
new file mode 100644
index 0000000..cad238a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+import org.apache.hadoop.classification.InterfaceAudience;
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f93de44..0823dfe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1167,6 +1167,25 @@
   </property>
 
   <property>
+    <description>Specify which log file controllers we will support. The first
+    file controller we add will be used to write the aggregated logs.
+    This comma separated configuration will work with the configuration:
+    yarn.log-aggregation.file-controller.%s.class which defines the supported
+    file controller's class. By default, the TFile controller would be used.
+    The user could override this configuration by adding more file controllers.
+    To support back-ward compatibility, make sure that we always
+    add TFile file controller.</description>
+    <name>yarn.log-aggregation.file-formats</name>
+    <value>TFile</value>
+  </property>
+
+  <property>
+    <description>Class that supports TFile read and write operations.</description>
+    <name>yarn.log-aggregation.file-controller.TFile.class</name>
+    <value>org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController</value>
+  </property>
+
+  <property>
     <description>
     How long for ResourceManager to wait for NodeManager to report its
     log aggregation status. If waiting time of which the log aggregation

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
index 1e71b3c..3dd7de3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
@@ -40,10 +40,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.webapp.YarnWebParams;
 import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
 import org.apache.hadoop.yarn.webapp.view.BlockForTest;
@@ -249,7 +253,7 @@ public class TestAggregatedLogsBlock {
   
   
   private Configuration getConfiguration() {
-    Configuration configuration = new Configuration();
+    Configuration configuration = new YarnConfiguration();
     configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
     configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/logs");
     configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
@@ -295,19 +299,25 @@ public class TestAggregatedLogsBlock {
     List<String> rootLogDirs = Arrays.asList("target/logs/logs");
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
-    try (AggregatedLogFormat.LogWriter writer =
-             new AggregatedLogFormat.LogWriter()) {
-      writer.initialize(configuration, new Path(path), ugi);
-      writer.writeApplicationOwner(ugi.getUserName());
-
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileController = factory
+        .getFileControllerForWrite();
+    try {
       Map<ApplicationAccessType, String> appAcls = new HashMap<>();
       appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-      writer.writeApplicationACLs(appAcls);
-
-      writer.append(
+      NodeId nodeId = NodeId.newInstance("localhost", 1234);
+      LogAggregationFileControllerContext context
+          = new LogAggregationFileControllerContext(
+              new Path(path), new Path(path), false, 3600,
+              appId, appAcls, nodeId, ugi);
+      fileController.initializeWriter(context);
+      fileController.write(
           new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
           new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
               UserGroupInformation.getCurrentUser().getShortUserName()));
+    } finally {
+      fileController.closeWriter();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index 8b665e0..a12e2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -24,15 +24,21 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Writer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 
 /**
  * This class contains several utility functions for log aggregation tests.
@@ -110,14 +116,25 @@ public final class TestContainerLogsUtils {
       ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
-    try (AggregatedLogFormat.LogWriter writer =
-        new AggregatedLogFormat.LogWriter()) {
-      writer.initialize(configuration, path, ugi);
-      writer.writeApplicationOwner(ugi.getUserName());
-
-      writer.append(new AggregatedLogFormat.LogKey(containerId),
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileController = factory
+        .getFileControllerForWrite();
+    try {
+      Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+      ApplicationId appId = containerId.getApplicationAttemptId()
+          .getApplicationId();
+      LogAggregationFileControllerContext context
+          = new LogAggregationFileControllerContext(
+              path, path, true, 1000,
+              appId, appAcls, nodeId, ugi);
+      fileController.initializeWriter(context);
+      fileController.write(new AggregatedLogFormat.LogKey(containerId),
           new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
               ugi.getShortUserName()));
+    } finally {
+      fileController.closeWriter();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
new file mode 100644
index 0000000..45f18c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.LinkedList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
+import org.junit.Test;
+
+/**
+ * Test LogAggregationFileControllerFactory.
+ *
+ */
+public class TestLogAggregationFileControllerFactory {
+
+  @Test(timeout = 10000)
+  public void testLogAggregationFileControllerFactory() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    String appOwner = "test";
+    String remoteLogRootDir = "target/app-logs/";
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
+    FileSystem fs = FileSystem.get(conf);
+
+    LogAggregationFileControllerFactory factory =
+        new LogAggregationFileControllerFactory(conf);
+    LinkedList<LogAggregationFileController> list = factory
+        .getConfiguredLogAggregationFileControllerList();
+    assertTrue(list.size() == 1);
+    assertTrue(list.getFirst() instanceof LogAggregationTFileController);
+    assertTrue(factory.getFileControllerForWrite()
+        instanceof LogAggregationTFileController);
+    Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
+    try {
+      if (fs.exists(logPath)) {
+        fs.delete(logPath, true);
+      }
+      assertTrue(fs.mkdirs(logPath));
+      Writer writer =
+          new FileWriter(new File(logPath.toString(), "testLog"));
+      writer.write("test");
+      writer.close();
+      assertTrue(factory.getFileControllerForRead(appId, appOwner)
+          instanceof LogAggregationTFileController);
+    } finally {
+      fs.delete(logPath, true);
+    }
+
+    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+        "TestLogAggregationFileController");
+    // Did not set class for TestLogAggregationFileController,
+    // should get the exception.
+    try {
+      factory =
+          new LogAggregationFileControllerFactory(conf);
+      fail();
+    } catch (Exception ex) {
+      // should get exception
+    }
+
+    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+        "TestLogAggregationFileController,TFile");
+    conf.setClass(
+        "yarn.log-aggregation.file-controller.TestLogAggregationFileController"
+        + ".class", TestLogAggregationFileController.class,
+        LogAggregationFileController.class);
+
+    conf.set(
+        "yarn.log-aggregation.TestLogAggregationFileController"
+        + ".remote-app-log-dir", remoteLogRootDir);
+    conf.set(
+        "yarn.log-aggregation.TestLogAggregationFileController"
+        + ".remote-app-log-dir-suffix", "testLog");
+
+    factory = new LogAggregationFileControllerFactory(conf);
+    list = factory.getConfiguredLogAggregationFileControllerList();
+    assertTrue(list.size() == 2);
+    assertTrue(list.getFirst() instanceof TestLogAggregationFileController);
+    assertTrue(list.getLast() instanceof LogAggregationTFileController);
+    assertTrue(factory.getFileControllerForWrite()
+        instanceof TestLogAggregationFileController);
+
+    logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
+    try {
+      if (fs.exists(logPath)) {
+        fs.delete(logPath, true);
+      }
+      assertTrue(fs.mkdirs(logPath));
+      Writer writer =
+          new FileWriter(new File(logPath.toString(), "testLog"));
+      writer.write("test");
+      writer.close();
+      assertTrue(factory.getFileControllerForRead(appId, appOwner)
+          instanceof TestLogAggregationFileController);
+    } finally {
+      fs.delete(logPath, true);
+    }
+  }
+
+  private static class TestLogAggregationFileController
+      extends LogAggregationFileController {
+
+    @Override
+    public void initInternal(Configuration conf) {
+      String remoteDirStr = String.format(
+          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+          this.fileControllerName);
+      this.remoteRootLogDir = new Path(conf.get(remoteDirStr));
+      String suffix = String.format(
+          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+           this.fileControllerName);
+      this.remoteRootLogDirSuffix = conf.get(suffix);
+    }
+
+    @Override
+    public void closeWriter() {
+      // Do Nothing
+    }
+
+    @Override
+    public void write(LogKey logKey, LogValue logValue) throws IOException {
+      // Do Nothing
+    }
+
+    @Override
+    public void postWrite(LogAggregationFileControllerContext record)
+        throws Exception {
+      // Do Nothing
+    }
+
+    @Override
+    public void initializeWriter(LogAggregationFileControllerContext context)
+        throws IOException {
+      // Do Nothing
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org