You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/08/24 20:35:08 UTC

[2/2] hadoop git commit: YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.

YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c2cb7ea1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c2cb7ea1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c2cb7ea1

Branch: refs/heads/trunk
Commit: c2cb7ea1ef6532020b69031dbd18b0f9b8369f0f
Parents: 8196a07
Author: Junping Du <ju...@apache.org>
Authored: Thu Aug 24 13:36:49 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Aug 24 13:36:49 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  12 +-
 .../yarn/conf/TestYarnConfigurationFields.java  |   2 +
 .../hadoop/yarn/client/cli/TestLogsCLI.java     |  65 +--
 .../logaggregation/AggregatedLogFormat.java     |  22 +-
 .../logaggregation/LogAggregationUtils.java     |  41 ++
 .../LogAggregationFileController.java           | 404 +++++++++++++++++++
 .../LogAggregationFileControllerContext.java    | 130 ++++++
 .../LogAggregationFileControllerFactory.java    | 195 +++++++++
 .../LogAggregationTFileController.java          | 127 ++++++
 .../filecontroller/package-info.java            |  21 +
 .../src/main/resources/yarn-default.xml         |  19 +
 .../logaggregation/TestAggregatedLogsBlock.java |  28 +-
 .../logaggregation/TestContainerLogsUtils.java  |  29 +-
 ...TestLogAggregationFileControllerFactory.java | 171 ++++++++
 .../logaggregation/AppLogAggregatorImpl.java    | 232 ++++-------
 .../logaggregation/LogAggregationService.java   | 210 +---------
 .../TestAppLogAggregatorImpl.java               |  25 +-
 .../TestLogAggregationService.java              | 135 +++++--
 18 files changed, 1419 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 86f45b8..16bd73a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1064,7 +1064,17 @@ public class YarnConfiguration extends Configuration {
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
       + "log-aggregation-enable";
   public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
-  
+
+  public static final String LOG_AGGREGATION_FILE_FORMATS = YARN_PREFIX
+      + "log-aggregation.file-formats";
+  public static final String LOG_AGGREGATION_FILE_CONTROLLER_FMT =
+      YARN_PREFIX + "log-aggregation.file-controller.%s.class";
+
+  public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT
+      = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir";
+  public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT
+      = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir-suffix";
+
   /** 
    * How long to wait before deleting aggregated logs, -1 disables.
    * Be careful set this too small and you will spam the name node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index c40c2c5..153a35a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -184,6 +184,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
     // Currently defined in RegistryConstants/core-site.xml
     xmlPrefixToSkipCompare.add("hadoop.registry");
 
+    xmlPrefixToSkipCompare.add(
+        "yarn.log-aggregation.file-controller.TFile.class");
     // Add the filters used for checking for collision of default values.
     initDefaultValueCollisionCheck();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index c054209..26e0319 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -36,7 +36,6 @@ import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.ClientResponse.Status;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileWriter;
@@ -78,6 +77,9 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
@@ -1345,42 +1347,55 @@ public class TestLogsCLI {
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
             + System.currentTimeMillis());
-    try (AggregatedLogFormat.LogWriter writer =
-             new AggregatedLogFormat.LogWriter()) {
-      writer.initialize(configuration, path, ugi);
-      writer.writeApplicationOwner(ugi.getUserName());
-
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileFormat = factory
+        .getFileControllerForWrite();
+    try {
       Map<ApplicationAccessType, String> appAcls = new HashMap<>();
       appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-      writer.writeApplicationACLs(appAcls);
-      writer.append(new AggregatedLogFormat.LogKey(containerId),
+      LogAggregationFileControllerContext context
+          = new LogAggregationFileControllerContext(
+              path, path, true, 1000,
+              containerId.getApplicationAttemptId().getApplicationId(),
+              appAcls, nodeId, ugi);
+      fileFormat.initializeWriter(context);
+      fileFormat.write(new AggregatedLogFormat.LogKey(containerId),
           new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
               UserGroupInformation.getCurrentUser().getShortUserName()));
+    } finally {
+      fileFormat.closeWriter();
     }
   }
 
+  @SuppressWarnings("static-access")
   private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
       Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
       ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
-    Path path =
-        new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
-            + System.currentTimeMillis());
-    try (AggregatedLogFormat.LogWriter writer =
-             new AggregatedLogFormat.LogWriter()) {
-      writer.initialize(configuration, path, ugi);
-      writer.writeApplicationOwner(ugi.getUserName());
-
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileFormat = factory
+        .getFileControllerForWrite();
+    try {
       Map<ApplicationAccessType, String> appAcls = new HashMap<>();
       appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-      writer.writeApplicationACLs(appAcls);
-      DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
-      new AggregatedLogFormat.LogKey(containerId).write(out);
-      out.close();
-      out = writer.getWriter().prepareAppendValue(-1);
-      new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
-          UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
-              new HashSet<>());
-      out.close();
+      ApplicationId appId = containerId.getApplicationAttemptId()
+          .getApplicationId();
+      Path path = fileFormat.getRemoteNodeLogFileForApp(
+          appId, ugi.getCurrentUser().getShortUserName(), nodeId);
+      LogAggregationFileControllerContext context
+          = new LogAggregationFileControllerContext(
+              path, path, true, 1000,
+              appId, appAcls, nodeId, ugi);
+      fileFormat.initializeWriter(context);
+      AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(
+          containerId);
+      AggregatedLogFormat.LogValue value = new AggregatedLogFormat.LogValue(
+          rootLogDirs, containerId, UserGroupInformation.getCurrentUser()
+              .getShortUserName());
+      fileFormat.write(key, value);
+    } finally {
+      fileFormat.closeWriter();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index d806b12..3c1dcdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -44,7 +44,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.regex.Pattern;
-
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.commons.io.output.WriterOutputStream;
 import org.apache.commons.logging.Log;
@@ -61,7 +61,6 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.file.tfile.TFile;
@@ -71,7 +70,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Times;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -249,7 +247,7 @@ public class AggregatedLogFormat {
           in = secureOpenFile(logFile);
         } catch (IOException e) {
           logErrorMessage(logFile, e);
-          IOUtils.cleanup(LOG, in);
+          IOUtils.closeQuietly(in);
           continue;
         }
 
@@ -287,7 +285,7 @@ public class AggregatedLogFormat {
           String message = logErrorMessage(logFile, e);
           out.write(message.getBytes(Charset.forName("UTF-8")));
         } finally {
-          IOUtils.cleanup(LOG, in);
+          IOUtils.closeQuietly(in);
         }
       }
     }
@@ -557,7 +555,7 @@ public class AggregatedLogFormat {
       } catch (Exception e) {
         LOG.warn("Exception closing writer", e);
       } finally {
-        IOUtils.closeStream(this.fsDataOStream);
+        IOUtils.closeQuietly(this.fsDataOStream);
       }
     }
   }
@@ -605,7 +603,7 @@ public class AggregatedLogFormat {
         }
         return null;
       } finally {
-        IOUtils.cleanup(LOG, ownerScanner);
+        IOUtils.closeQuietly(ownerScanner);
       }
     }
 
@@ -651,7 +649,7 @@ public class AggregatedLogFormat {
         }
         return acls;
       } finally {
-        IOUtils.cleanup(LOG, aclScanner);
+        IOUtils.closeQuietly(aclScanner);
       }
     }
 
@@ -775,8 +773,8 @@ public class AggregatedLogFormat {
           }
         }
       } finally {
-        IOUtils.cleanup(LOG, ps);
-        IOUtils.cleanup(LOG, os);
+        IOUtils.closeQuietly(ps);
+        IOUtils.closeQuietly(os);
       }
     }
 
@@ -1001,7 +999,9 @@ public class AggregatedLogFormat {
     }
 
     public void close() {
-      IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
+      IOUtils.closeQuietly(scanner);
+      IOUtils.closeQuietly(reader);
+      IOUtils.closeQuietly(fsDataIStream);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 24baaab..e8a28de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -133,6 +133,23 @@ public class LogAggregationUtils {
         new org.apache.hadoop.fs.Path(conf.get(
             YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    return getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix);
+  }
+
+  /**
+   * Return the remote application log directory.
+   * @param conf the configuration
+   * @param appId the application
+   * @param appOwner the application owner
+   * @param remoteRootLogDir the remote root log directory
+   * @param suffix the log directory suffix
+   * @return the remote application log directory path
+   * @throws IOException if we can not find remote application log directory
+   */
+  public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
+      Configuration conf, ApplicationId appId, String appOwner,
+      org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+      throws IOException {
     org.apache.hadoop.fs.Path remoteAppDir = null;
     if (appOwner == null) {
       org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
@@ -159,6 +176,30 @@ public class LogAggregationUtils {
    * @param conf the configuration
    * @param appId the applicationId
    * @param appOwner the application owner
+   * @param remoteRootLogDir the remote root log directory
+   * @param suffix the log directory suffix
+   * @return the iterator of available log files
+   * @throws IOException if there is no log file available
+   */
+  public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
+      Configuration conf, ApplicationId appId, String appOwner,
+      org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+      throws IOException {
+    Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
+        remoteRootLogDir, suffix);
+    RemoteIterator<FileStatus> nodeFiles = null;
+    Path qualifiedLogDir =
+        FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
+    nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
+        conf).listStatus(remoteAppLogDir);
+    return nodeFiles;
+  }
+
+  /**
+   * Get all available log files under remote app log directory.
+   * @param conf the configuration
+   * @param appId the applicationId
+   * @param appOwner the application owner
    * @return the iterator of available log files
    * @throws IOException if there is no log file available
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
new file mode 100644
index 0000000..5503f8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+
+/**
+ * Base class to implement Log Aggregation File Controller.
+ */
+@Private
+@Unstable
+public abstract class LogAggregationFileController {
+
+  private static final Log LOG = LogFactory.getLog(
+      LogAggregationFileController.class);
+
+  /*
+   * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
+   * Group to which NMOwner belongs> App dirs will be created as 770,
+   * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
+   * access / modify the files.
+   * <NMGroup> should obviously be a limited access group.
+   */
+  /**
+   * Permissions for the top level directory under which app directories will be
+   * created.
+   */
+  protected static final FsPermission TLDIR_PERMISSIONS = FsPermission
+      .createImmutable((short) 01777);
+
+  /**
+   * Permissions for the Application directory.
+   */
+  protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
+      .createImmutable((short) 0770);
+
+  // This is temporary solution. The configuration will be deleted once
+  // we find a more scalable method to only write a single log file per LRS.
+  private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
+      = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
+  private static final int
+      DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
+
+  protected Configuration conf;
+  protected Path remoteRootLogDir;
+  protected String remoteRootLogDirSuffix;
+  protected int retentionSize;
+  protected String fileControllerName;
+
+  public LogAggregationFileController() {}
+
+  /**
+   * Initialize the log file controller.
+   * @param conf the Configuration
+   * @param controllerName the log controller class name
+   */
+  public void initialize(Configuration conf, String controllerName) {
+    this.conf = conf;
+    int configuredRentionSize =
+        conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
+            DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
+    if (configuredRentionSize <= 0) {
+      this.retentionSize =
+        DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
+    } else {
+      this.retentionSize = configuredRentionSize;
+    }
+    this.fileControllerName = controllerName;
+    initInternal(conf);
+  }
+
+  /**
+   * Derived classes initialize themselves using this method.
+   * @param conf the Configuration
+   */
+  protected abstract void initInternal(Configuration conf);
+
+  /**
+   * Get the remote root log directory.
+   * @return the remote root log directory path
+   */
+  public Path getRemoteRootLogDir() {
+    return this.remoteRootLogDir;
+  }
+
+  /**
+   * Get the log aggregation directory suffix.
+   * @return the log aggregation directory suffix
+   */
+  public String getRemoteRootLogDirSuffix() {
+    return this.remoteRootLogDirSuffix;
+  }
+
+  /**
+   * Initialize the writer.
+   * @param context the {@link LogAggregationFileControllerContext}
+   * @throws IOException if fails to initialize the writer
+   */
+  public abstract void initializeWriter(
+      LogAggregationFileControllerContext context) throws IOException;
+
+  /**
+   * Close the writer.
+   */
+  public abstract void closeWriter();
+
+  /**
+   * Write the log content.
+   * @param logKey the log key
+   * @param logValue the log content
+   * @throws IOException if fails to write the logs
+   */
+  public abstract void write(LogKey logKey, LogValue logValue)
+      throws IOException;
+
+  /**
+   * Operations needed after write the log content.
+   * @param record the {@link LogAggregationFileControllerContext}
+   * @throws Exception if anything fails
+   */
+  public abstract void postWrite(LogAggregationFileControllerContext record)
+      throws Exception;
+
+  /**
+   * Verify and create the remote log directory.
+   */
+  public void verifyAndCreateRemoteLogDir() {
+    boolean logPermError = true;
+    // Checking the existence of the TLD
+    FileSystem remoteFS = null;
+    try {
+      remoteFS = getFileSystem(conf);
+    } catch (IOException e) {
+      throw new YarnRuntimeException(
+          "Unable to get Remote FileSystem instance", e);
+    }
+    boolean remoteExists = true;
+    Path remoteRootLogDir = getRemoteRootLogDir();
+    try {
+      FsPermission perms =
+          remoteFS.getFileStatus(remoteRootLogDir).getPermission();
+      if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
+        LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+            + "] already exist, but with incorrect permissions. "
+            + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+            + "]." + " The cluster may have problems with multiple users.");
+        logPermError = false;
+      } else {
+        logPermError = true;
+      }
+    } catch (FileNotFoundException e) {
+      remoteExists = false;
+    } catch (IOException e) {
+      throw new YarnRuntimeException(
+          "Failed to check permissions for dir ["
+              + remoteRootLogDir + "]", e);
+    }
+    if (!remoteExists) {
+      LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+          + "] does not exist. Attempting to create it.");
+      try {
+        Path qualified =
+            remoteRootLogDir.makeQualified(remoteFS.getUri(),
+                remoteFS.getWorkingDirectory());
+        remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
+        remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
+
+        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+        String primaryGroupName = null;
+        try {
+          primaryGroupName = loginUser.getPrimaryGroupName();
+        } catch (IOException e) {
+          LOG.warn("No primary group found. The remote root log directory" +
+              " will be created with the HDFS superuser being its group " +
+              "owner. JobHistoryServer may be unable to read the directory.");
+        }
+        // set owner on the remote directory only if the primary group exists
+        if (primaryGroupName != null) {
+          remoteFS.setOwner(qualified,
+              loginUser.getShortUserName(), primaryGroupName);
+        }
+      } catch (IOException e) {
+        throw new YarnRuntimeException("Failed to create remoteLogDir ["
+            + remoteRootLogDir + "]", e);
+      }
+    }
+  }
+
+  /**
+   * Create remote Application directory for log aggregation.
+   * @param user the user
+   * @param appId the application ID
+   * @param userUgi the UGI
+   */
+  public void createAppDir(final String user, final ApplicationId appId,
+      UserGroupInformation userUgi) {
+    final Path remoteRootLogDir = getRemoteRootLogDir();
+    final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix();
+    try {
+      userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          try {
+            // TODO: Reuse FS for user?
+            FileSystem remoteFS = getFileSystem(conf);
+
+            // Only creating directories if they are missing to avoid
+            // unnecessary load on the filesystem from all of the nodes
+            Path appDir = LogAggregationUtils.getRemoteAppLogDir(
+                remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
+
+            appDir = appDir.makeQualified(remoteFS.getUri(),
+                remoteFS.getWorkingDirectory());
+
+            if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
+              Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
+                  remoteRootLogDir, user, remoteRootLogDirSuffix);
+              suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
+                  remoteFS.getWorkingDirectory());
+
+              if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
+                Path userDir = LogAggregationUtils.getRemoteLogUserDir(
+                    remoteRootLogDir, user);
+                userDir = userDir.makeQualified(remoteFS.getUri(),
+                    remoteFS.getWorkingDirectory());
+
+                if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
+                  createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+                }
+
+                createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+              }
+
+              createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+            }
+
+          } catch (IOException e) {
+            LOG.error("Failed to setup application log directory for "
+                + appId, e);
+            throw e;
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return getRemoteRootLogDir().getFileSystem(conf);
+  }
+
+  protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
+      throws IOException {
+    FsPermission dirPerm = new FsPermission(fsPerm);
+    fs.mkdirs(path, dirPerm);
+    FsPermission umask = FsPermission.getUMask(fs.getConf());
+    if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
+      fs.setPermission(path, new FsPermission(fsPerm));
+    }
+  }
+
+  protected boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
+      throws IOException {
+    boolean exists = true;
+    try {
+      FileStatus appDirStatus = fs.getFileStatus(path);
+      if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
+        fs.setPermission(path, APP_DIR_PERMISSIONS);
+      }
+    } catch (FileNotFoundException fnfe) {
+      exists = false;
+    }
+    return exists;
+  }
+
+  /**
+   * Get the remote aggregated log path.
+   * @param appId the ApplicationId
+   * @param user the Application Owner
+   * @param nodeId the NodeManager Id
+   * @return the remote aggregated log path
+   */
+  public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
+      NodeId nodeId) {
+    return LogAggregationUtils.getRemoteNodeLogFileForApp(
+        getRemoteRootLogDir(), appId, user, nodeId,
+        getRemoteRootLogDirSuffix());
+  }
+
+  /**
+   * Get the remote application directory for log aggregation.
+   * @param appId the Application ID
+   * @param appOwner the Application Owner
+   * @return the remote application directory
+   * @throws IOException if can not find the remote application directory
+   */
+  public Path getRemoteAppLogDir(ApplicationId appId, String appOwner)
+      throws IOException {
+    return LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner,
+        this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+  }
+
+  protected void cleanOldLogs(Path remoteNodeLogFileForApp,
+      final NodeId nodeId, UserGroupInformation userUgi) {
+    try {
+      final FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
+      Path appDir = remoteNodeLogFileForApp.getParent().makeQualified(
+          remoteFS.getUri(), remoteFS.getWorkingDirectory());
+      Set<FileStatus> status =
+          new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
+
+      Iterable<FileStatus> mask =
+          Iterables.filter(status, new Predicate<FileStatus>() {
+            @Override
+            public boolean apply(FileStatus next) {
+              return next.getPath().getName()
+                .contains(LogAggregationUtils.getNodeString(nodeId))
+                && !next.getPath().getName().endsWith(
+                    LogAggregationUtils.TMP_FILE_SUFFIX);
+            }
+          });
+      status = Sets.newHashSet(mask);
+      // Normally, we just need to delete one oldest log
+      // before we upload a new log.
+      // If we can not delete the older logs in this cycle,
+      // we will delete them in next cycle.
+      if (status.size() >= this.retentionSize) {
+        // sort by the lastModificationTime ascending
+        List<FileStatus> statusList = new ArrayList<FileStatus>(status);
+        Collections.sort(statusList, new Comparator<FileStatus>() {
+          public int compare(FileStatus s1, FileStatus s2) {
+            return s1.getModificationTime() < s2.getModificationTime() ? -1
+                : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
+          }
+        });
+        for (int i = 0; i <= statusList.size() - this.retentionSize; i++) {
+          final FileStatus remove = statusList.get(i);
+          try {
+            userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                remoteFS.delete(remove.getPath(), false);
+                return null;
+              }
+            });
+          } catch (Exception e) {
+            LOG.error("Failed to delete " + remove.getPath(), e);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to clean old logs", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
new file mode 100644
index 0000000..32128bc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
@@ -0,0 +1,130 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * {@code LogAggregationFileControllerContext} is a record used in
+ * the log aggregation process.
+ */
+@Private
+@Unstable
+public class LogAggregationFileControllerContext {
+  private final boolean logAggregationInRolling;
+  private final long rollingMonitorInterval;
+  private final Path remoteNodeLogFileForApp;
+  private final NodeId nodeId;
+  private final UserGroupInformation userUgi;
+  private final ApplicationId appId;
+  private final Path remoteNodeTmpLogFileForApp;
+  private final Map<ApplicationAccessType, String> appAcls;
+  private int logAggregationTimes = 0;
+  private int cleanOldLogsTimes = 0;
+
+  private boolean uploadedLogsInThisCycle;
+  private long logUploadedTimeStamp;
+
+  public LogAggregationFileControllerContext(Path remoteNodeLogFileForApp,
+      Path remoteNodeTmpLogFileForApp,
+      boolean logAggregationInRolling,
+      long rollingMonitorInterval,
+      ApplicationId appId,
+      Map<ApplicationAccessType, String> appAcls,
+      NodeId nodeId, UserGroupInformation userUgi) {
+    this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+    this.remoteNodeTmpLogFileForApp = remoteNodeTmpLogFileForApp;
+    this.logAggregationInRolling = logAggregationInRolling;
+    this.rollingMonitorInterval = rollingMonitorInterval;
+    this.nodeId = nodeId;
+    this.appId = appId;
+    this.appAcls = appAcls;
+    this.userUgi = userUgi;
+  }
+
+  public boolean isUploadedLogsInThisCycle() {
+    return uploadedLogsInThisCycle;
+  }
+
+  public void setUploadedLogsInThisCycle(boolean uploadedLogsInThisCycle) {
+    this.uploadedLogsInThisCycle = uploadedLogsInThisCycle;
+  }
+
+  public Path getRemoteNodeLogFileForApp() {
+    return remoteNodeLogFileForApp;
+  }
+
+  public long getRollingMonitorInterval() {
+    return rollingMonitorInterval;
+  }
+
+  public boolean isLogAggregationInRolling() {
+    return logAggregationInRolling;
+  }
+
+  public long getLogUploadTimeStamp() {
+    return logUploadedTimeStamp;
+  }
+
+  public void setLogUploadTimeStamp(long uploadTimeStamp) {
+    this.logUploadedTimeStamp = uploadTimeStamp;
+  }
+
+  public NodeId getNodeId() {
+    return nodeId;
+  }
+
+  public UserGroupInformation getUserUgi() {
+    return userUgi;
+  }
+
+  public ApplicationId getAppId() {
+    return appId;
+  }
+
+  public Path getRemoteNodeTmpLogFileForApp() {
+    return remoteNodeTmpLogFileForApp;
+  }
+
+  public void increLogAggregationTimes() {
+    this.logAggregationTimes++;
+  }
+
+  public void increcleanupOldLogTimes() {
+    this.cleanOldLogsTimes++;
+  }
+
+  public int getLogAggregationTimes() {
+    return logAggregationTimes;
+  }
+
+  public int getCleanOldLogsTimes() {
+    return cleanOldLogsTimes;
+  }
+
+  public Map<ApplicationAccessType, String> getAppAcls() {
+    return appAcls;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
new file mode 100644
index 0000000..746bf5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Use {@code LogAggregationFileControllerFactory} to get the correct
+ * {@link LogAggregationFileController} for write and read.
+ *
+ */
+@Private
+@Unstable
+public class LogAggregationFileControllerFactory {
+
+  private static final Log LOG = LogFactory.getLog(
+      LogAggregationFileControllerFactory.class);
+  private final Pattern p = Pattern.compile(
+      "^[A-Za-z_]+[A-Za-z0-9_]*$");
+  private LinkedList<LogAggregationFileController> controllers
+      = new LinkedList<>();
+  private Configuration conf;
+
+  /**
+   * Construct the LogAggregationFileControllerFactory object.
+   * @param conf the Configuration
+   */
+  public LogAggregationFileControllerFactory(Configuration conf) {
+    this.conf = conf;
+    Collection<String> fileControllers = conf.getStringCollection(
+        YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
+    List<String> controllerClassName = new ArrayList<>();
+
+    Map<String, String> controllerChecker = new HashMap<>();
+
+    for (String fileController : fileControllers) {
+      Preconditions.checkArgument(validateAggregatedFileControllerName(
+          fileController), "The FileControllerName: " + fileController
+          + " set in " + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS
+          +" is invalid." + "The valid File Controller name should only "
+          + "contain a-zA-Z0-9_ and can not start with numbers");
+
+      String remoteDirStr = String.format(
+          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+          fileController);
+      String remoteDir = conf.get(remoteDirStr);
+      boolean defaultRemoteDir = false;
+      if (remoteDir == null || remoteDir.isEmpty()) {
+        remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
+        defaultRemoteDir = true;
+      }
+      String suffixStr = String.format(
+          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+          fileController);
+      String suffix = conf.get(suffixStr);
+      boolean defaultSuffix = false;
+      if (suffix == null || suffix.isEmpty()) {
+        suffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+        defaultSuffix = true;
+      }
+      String dirSuffix = remoteDir + "-" + suffix;
+      if (controllerChecker.containsKey(dirSuffix)) {
+        if (defaultRemoteDir && defaultSuffix) {
+          String fileControllerStr = controllerChecker.get(dirSuffix);
+          List<String> controllersList = new ArrayList<>();
+          controllersList.add(fileControllerStr);
+          controllersList.add(fileController);
+          fileControllerStr = StringUtils.join(controllersList, ",");
+          controllerChecker.put(dirSuffix, fileControllerStr);
+        } else {
+          String conflictController = controllerChecker.get(dirSuffix);
+          throw new RuntimeException("The combined value of " + remoteDirStr
+              + " and " + suffixStr + " should not be the same as the value"
+              + " set for " + conflictController);
+        }
+      } else {
+        controllerChecker.put(dirSuffix, fileController);
+      }
+      String classKey = String.format(
+          YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT,
+          fileController);
+      String className = conf.get(classKey);
+      if (className == null || className.isEmpty()) {
+        throw new RuntimeException("No class configured for "
+            + fileController);
+      }
+      controllerClassName.add(className);
+      Class<? extends LogAggregationFileController> sClass = conf.getClass(
+          classKey, null, LogAggregationFileController.class);
+      if (sClass == null) {
+        throw new RuntimeException("No class defined for " + fileController);
+      }
+      LogAggregationFileController s = ReflectionUtils.newInstance(
+          sClass, conf);
+      if (s == null) {
+        throw new RuntimeException("No object created for "
+            + controllerClassName);
+      }
+      s.initialize(conf, fileController);
+      controllers.add(s);
+    }
+  }
+
+  /**
+   * Get {@link LogAggregationFileController} to write.
+   * @return the LogAggregationFileController instance
+   */
+  public LogAggregationFileController getFileControllerForWrite() {
+    return controllers.getFirst();
+  }
+
+  /**
+   * Get {@link LogAggregationFileController} to read the aggregated logs
+   * for this application.
+   * @param appId the ApplicationId
+   * @param appOwner the Application Owner
+   * @return the LogAggregationFileController instance
+   * @throws IOException if can not find any log aggregation file controller
+   */
+  public LogAggregationFileController getFileControllerForRead(
+      ApplicationId appId, String appOwner) throws IOException {
+    StringBuilder diagnosis = new StringBuilder();
+    for(LogAggregationFileController fileController : controllers) {
+      try {
+        Path remoteAppLogDir = fileController.getRemoteAppLogDir(
+            appId, appOwner);
+        Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(
+            remoteAppLogDir);
+        RemoteIterator<FileStatus> nodeFiles = FileContext.getFileContext(
+            qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
+        if (nodeFiles.hasNext()) {
+          return fileController;
+        }
+      } catch (Exception ex) {
+        diagnosis.append(ex.getMessage() + "\n");
+        continue;
+      }
+    }
+    throw new IOException(diagnosis.toString());
+  }
+
+  private boolean validateAggregatedFileControllerName(String name) {
+    if (name == null || name.trim().isEmpty()) {
+      return false;
+    }
+    return p.matcher(name).matches();
+  }
+
+  @Private
+  @VisibleForTesting
+  public LinkedList<LogAggregationFileController>
+      getConfiguredLogAggregationFileControllerList() {
+    return this.controllers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
new file mode 100644
index 0000000..9e0c66d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.Times;
+
+/**
+ * The TFile log aggregation file Controller implementation.
+ */
+@Private
+@Unstable
+public class LogAggregationTFileController
+    extends LogAggregationFileController {
+
+  private static final Log LOG = LogFactory.getLog(
+      LogAggregationTFileController.class);
+
+  private LogWriter writer;
+
+  public LogAggregationTFileController(){}
+
+  @Override
+  public void initInternal(Configuration conf) {
+    this.remoteRootLogDir = new Path(
+        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    this.remoteRootLogDirSuffix =
+        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+  }
+
+  @Override
+  public void initializeWriter(LogAggregationFileControllerContext context)
+      throws IOException {
+    this.writer = new LogWriter();
+    writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
+        context.getUserUgi());
+    // Write ACLs once when the writer is created.
+    writer.writeApplicationACLs(context.getAppAcls());
+    writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
+  }
+
+  @Override
+  public void closeWriter() {
+    this.writer.close();
+  }
+
+  @Override
+  public void write(LogKey logKey, LogValue logValue) throws IOException {
+    this.writer.append(logKey, logValue);
+  }
+
+  @Override
+  public void postWrite(final LogAggregationFileControllerContext record)
+      throws Exception {
+    // Before upload logs, make sure the number of existing logs
+    // is smaller than the configured NM log aggregation retention size.
+    if (record.isUploadedLogsInThisCycle() &&
+        record.isLogAggregationInRolling()) {
+      cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
+          record.getUserUgi());
+      record.increcleanupOldLogTimes();
+    }
+
+    final Path renamedPath = record.getRollingMonitorInterval() <= 0
+        ? record.getRemoteNodeLogFileForApp() : new Path(
+            record.getRemoteNodeLogFileForApp().getParent(),
+            record.getRemoteNodeLogFileForApp().getName() + "_"
+            + record.getLogUploadTimeStamp());
+    final boolean rename = record.isUploadedLogsInThisCycle();
+    try {
+      record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
+              .getFileSystem(conf);
+          if (rename) {
+            remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
+                renamedPath);
+          } else {
+            remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error(
+          "Failed to move temporary log file to final location: ["
+          + record.getRemoteNodeTmpLogFileForApp() + "] to ["
+          + renamedPath + "]", e);
+      throw new Exception("Log uploaded failed for Application: "
+          + record.getAppId() + " in NodeManager: "
+          + LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+          + Times.format(record.getLogUploadTimeStamp()) + "\n");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
new file mode 100644
index 0000000..cad238a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+import org.apache.hadoop.classification.InterfaceAudience;
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f93de44..0823dfe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1167,6 +1167,25 @@
   </property>
 
   <property>
+    <description>Specify which log file controllers we will support. The first
+    file controller we add will be used to write the aggregated logs.
+    This comma separated configuration will work with the configuration:
+    yarn.log-aggregation.file-controller.%s.class which defines the supported
+    file controller's class. By default, the TFile controller would be used.
+    The user could override this configuration by adding more file controllers.
+    To support back-ward compatibility, make sure that we always
+    add TFile file controller.</description>
+    <name>yarn.log-aggregation.file-formats</name>
+    <value>TFile</value>
+  </property>
+
+  <property>
+    <description>Class that supports TFile read and write operations.</description>
+    <name>yarn.log-aggregation.file-controller.TFile.class</name>
+    <value>org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController</value>
+  </property>
+
+  <property>
     <description>
     How long for ResourceManager to wait for NodeManager to report its
     log aggregation status. If waiting time of which the log aggregation

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
index 1e71b3c..3dd7de3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
@@ -40,10 +40,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.webapp.YarnWebParams;
 import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
 import org.apache.hadoop.yarn.webapp.view.BlockForTest;
@@ -249,7 +253,7 @@ public class TestAggregatedLogsBlock {
   
   
   private Configuration getConfiguration() {
-    Configuration configuration = new Configuration();
+    Configuration configuration = new YarnConfiguration();
     configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
     configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/logs");
     configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
@@ -295,19 +299,25 @@ public class TestAggregatedLogsBlock {
     List<String> rootLogDirs = Arrays.asList("target/logs/logs");
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
-    try (AggregatedLogFormat.LogWriter writer =
-             new AggregatedLogFormat.LogWriter()) {
-      writer.initialize(configuration, new Path(path), ugi);
-      writer.writeApplicationOwner(ugi.getUserName());
-
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileController = factory
+        .getFileControllerForWrite();
+    try {
       Map<ApplicationAccessType, String> appAcls = new HashMap<>();
       appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-      writer.writeApplicationACLs(appAcls);
-
-      writer.append(
+      NodeId nodeId = NodeId.newInstance("localhost", 1234);
+      LogAggregationFileControllerContext context
+          = new LogAggregationFileControllerContext(
+              new Path(path), new Path(path), false, 3600,
+              appId, appAcls, nodeId, ugi);
+      fileController.initializeWriter(context);
+      fileController.write(
           new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
           new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
               UserGroupInformation.getCurrentUser().getShortUserName()));
+    } finally {
+      fileController.closeWriter();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index 8b665e0..a12e2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -24,15 +24,21 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Writer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 
 /**
  * This class contains several utility functions for log aggregation tests.
@@ -110,14 +116,25 @@ public final class TestContainerLogsUtils {
       ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
-    try (AggregatedLogFormat.LogWriter writer =
-        new AggregatedLogFormat.LogWriter()) {
-      writer.initialize(configuration, path, ugi);
-      writer.writeApplicationOwner(ugi.getUserName());
-
-      writer.append(new AggregatedLogFormat.LogKey(containerId),
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileController = factory
+        .getFileControllerForWrite();
+    try {
+      Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+      ApplicationId appId = containerId.getApplicationAttemptId()
+          .getApplicationId();
+      LogAggregationFileControllerContext context
+          = new LogAggregationFileControllerContext(
+              path, path, true, 1000,
+              appId, appAcls, nodeId, ugi);
+      fileController.initializeWriter(context);
+      fileController.write(new AggregatedLogFormat.LogKey(containerId),
           new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
               ugi.getShortUserName()));
+    } finally {
+      fileController.closeWriter();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
new file mode 100644
index 0000000..45f18c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.LinkedList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
+import org.junit.Test;
+
+/**
+ * Test LogAggregationFileControllerFactory.
+ *
+ */
+public class TestLogAggregationFileControllerFactory {
+
+  @Test(timeout = 10000)
+  public void testLogAggregationFileControllerFactory() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    String appOwner = "test";
+    String remoteLogRootDir = "target/app-logs/";
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
+    FileSystem fs = FileSystem.get(conf);
+
+    LogAggregationFileControllerFactory factory =
+        new LogAggregationFileControllerFactory(conf);
+    LinkedList<LogAggregationFileController> list = factory
+        .getConfiguredLogAggregationFileControllerList();
+    assertTrue(list.size() == 1);
+    assertTrue(list.getFirst() instanceof LogAggregationTFileController);
+    assertTrue(factory.getFileControllerForWrite()
+        instanceof LogAggregationTFileController);
+    Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
+    try {
+      if (fs.exists(logPath)) {
+        fs.delete(logPath, true);
+      }
+      assertTrue(fs.mkdirs(logPath));
+      Writer writer =
+          new FileWriter(new File(logPath.toString(), "testLog"));
+      writer.write("test");
+      writer.close();
+      assertTrue(factory.getFileControllerForRead(appId, appOwner)
+          instanceof LogAggregationTFileController);
+    } finally {
+      fs.delete(logPath, true);
+    }
+
+    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+        "TestLogAggregationFileController");
+    // Did not set class for TestLogAggregationFileController,
+    // should get the exception.
+    try {
+      factory =
+          new LogAggregationFileControllerFactory(conf);
+      fail();
+    } catch (Exception ex) {
+      // should get exception
+    }
+
+    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+        "TestLogAggregationFileController,TFile");
+    conf.setClass(
+        "yarn.log-aggregation.file-controller.TestLogAggregationFileController"
+        + ".class", TestLogAggregationFileController.class,
+        LogAggregationFileController.class);
+
+    conf.set(
+        "yarn.log-aggregation.TestLogAggregationFileController"
+        + ".remote-app-log-dir", remoteLogRootDir);
+    conf.set(
+        "yarn.log-aggregation.TestLogAggregationFileController"
+        + ".remote-app-log-dir-suffix", "testLog");
+
+    factory = new LogAggregationFileControllerFactory(conf);
+    list = factory.getConfiguredLogAggregationFileControllerList();
+    assertTrue(list.size() == 2);
+    assertTrue(list.getFirst() instanceof TestLogAggregationFileController);
+    assertTrue(list.getLast() instanceof LogAggregationTFileController);
+    assertTrue(factory.getFileControllerForWrite()
+        instanceof TestLogAggregationFileController);
+
+    logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
+    try {
+      if (fs.exists(logPath)) {
+        fs.delete(logPath, true);
+      }
+      assertTrue(fs.mkdirs(logPath));
+      Writer writer =
+          new FileWriter(new File(logPath.toString(), "testLog"));
+      writer.write("test");
+      writer.close();
+      assertTrue(factory.getFileControllerForRead(appId, appOwner)
+          instanceof TestLogAggregationFileController);
+    } finally {
+      fs.delete(logPath, true);
+    }
+  }
+
+  private static class TestLogAggregationFileController
+      extends LogAggregationFileController {
+
+    @Override
+    public void initInternal(Configuration conf) {
+      String remoteDirStr = String.format(
+          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+          this.fileControllerName);
+      this.remoteRootLogDir = new Path(conf.get(remoteDirStr));
+      String suffix = String.format(
+          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+           this.fileControllerName);
+      this.remoteRootLogDirSuffix = conf.get(suffix);
+    }
+
+    @Override
+    public void closeWriter() {
+      // Do Nothing
+    }
+
+    @Override
+    public void write(LogKey logKey, LogValue logValue) throws IOException {
+      // Do Nothing
+    }
+
+    @Override
+    public void postWrite(LogAggregationFileControllerContext record)
+        throws Exception {
+      // Do Nothing
+    }
+
+    @Override
+    public void initializeWriter(LogAggregationFileControllerContext context)
+        throws IOException {
+      // Do Nothing
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org