You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/08/24 20:35:08 UTC
[2/2] hadoop git commit: YARN-6876. Create an abstract log writer for
extendability. Contributed by Xuan Gong.
YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c2cb7ea1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c2cb7ea1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c2cb7ea1
Branch: refs/heads/trunk
Commit: c2cb7ea1ef6532020b69031dbd18b0f9b8369f0f
Parents: 8196a07
Author: Junping Du <ju...@apache.org>
Authored: Thu Aug 24 13:36:49 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Aug 24 13:36:49 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 12 +-
.../yarn/conf/TestYarnConfigurationFields.java | 2 +
.../hadoop/yarn/client/cli/TestLogsCLI.java | 65 +--
.../logaggregation/AggregatedLogFormat.java | 22 +-
.../logaggregation/LogAggregationUtils.java | 41 ++
.../LogAggregationFileController.java | 404 +++++++++++++++++++
.../LogAggregationFileControllerContext.java | 130 ++++++
.../LogAggregationFileControllerFactory.java | 195 +++++++++
.../LogAggregationTFileController.java | 127 ++++++
.../filecontroller/package-info.java | 21 +
.../src/main/resources/yarn-default.xml | 19 +
.../logaggregation/TestAggregatedLogsBlock.java | 28 +-
.../logaggregation/TestContainerLogsUtils.java | 29 +-
...TestLogAggregationFileControllerFactory.java | 171 ++++++++
.../logaggregation/AppLogAggregatorImpl.java | 232 ++++-------
.../logaggregation/LogAggregationService.java | 210 +---------
.../TestAppLogAggregatorImpl.java | 25 +-
.../TestLogAggregationService.java | 135 +++++--
18 files changed, 1419 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 86f45b8..16bd73a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1064,7 +1064,17 @@ public class YarnConfiguration extends Configuration {
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
-
+
+ public static final String LOG_AGGREGATION_FILE_FORMATS = YARN_PREFIX
+ + "log-aggregation.file-formats";
+ public static final String LOG_AGGREGATION_FILE_CONTROLLER_FMT =
+ YARN_PREFIX + "log-aggregation.file-controller.%s.class";
+
+ public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT
+ = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir";
+ public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT
+ = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir-suffix";
+
/**
* How long to wait before deleting aggregated logs, -1 disables.
* Be careful set this too small and you will spam the name node.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index c40c2c5..153a35a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -184,6 +184,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
// Currently defined in RegistryConstants/core-site.xml
xmlPrefixToSkipCompare.add("hadoop.registry");
+ xmlPrefixToSkipCompare.add(
+ "yarn.log-aggregation.file-controller.TFile.class");
// Add the filters used for checking for collision of default values.
initDefaultValueCollisionCheck();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index c054209..26e0319 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -36,7 +36,6 @@ import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
@@ -78,6 +77,9 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
@@ -1345,42 +1347,55 @@ public class TestLogsCLI {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
+ System.currentTimeMillis());
- try (AggregatedLogFormat.LogWriter writer =
- new AggregatedLogFormat.LogWriter()) {
- writer.initialize(configuration, path, ugi);
- writer.writeApplicationOwner(ugi.getUserName());
-
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(configuration);
+ LogAggregationFileController fileFormat = factory
+ .getFileControllerForWrite();
+ try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
- writer.writeApplicationACLs(appAcls);
- writer.append(new AggregatedLogFormat.LogKey(containerId),
+ LogAggregationFileControllerContext context
+ = new LogAggregationFileControllerContext(
+ path, path, true, 1000,
+ containerId.getApplicationAttemptId().getApplicationId(),
+ appAcls, nodeId, ugi);
+ fileFormat.initializeWriter(context);
+ fileFormat.write(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
+ } finally {
+ fileFormat.closeWriter();
}
}
+ @SuppressWarnings("static-access")
private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
- Path path =
- new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
- + System.currentTimeMillis());
- try (AggregatedLogFormat.LogWriter writer =
- new AggregatedLogFormat.LogWriter()) {
- writer.initialize(configuration, path, ugi);
- writer.writeApplicationOwner(ugi.getUserName());
-
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(configuration);
+ LogAggregationFileController fileFormat = factory
+ .getFileControllerForWrite();
+ try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
- writer.writeApplicationACLs(appAcls);
- DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
- new AggregatedLogFormat.LogKey(containerId).write(out);
- out.close();
- out = writer.getWriter().prepareAppendValue(-1);
- new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
- UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
- new HashSet<>());
- out.close();
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ Path path = fileFormat.getRemoteNodeLogFileForApp(
+ appId, ugi.getCurrentUser().getShortUserName(), nodeId);
+ LogAggregationFileControllerContext context
+ = new LogAggregationFileControllerContext(
+ path, path, true, 1000,
+ appId, appAcls, nodeId, ugi);
+ fileFormat.initializeWriter(context);
+ AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(
+ containerId);
+ AggregatedLogFormat.LogValue value = new AggregatedLogFormat.LogValue(
+ rootLogDirs, containerId, UserGroupInformation.getCurrentUser()
+ .getShortUserName());
+ fileFormat.write(key, value);
+ } finally {
+ fileFormat.closeWriter();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index d806b12..3c1dcdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -44,7 +44,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;
-
+import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.WriterOutputStream;
import org.apache.commons.logging.Log;
@@ -61,7 +61,6 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
@@ -71,7 +70,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting;
@@ -249,7 +247,7 @@ public class AggregatedLogFormat {
in = secureOpenFile(logFile);
} catch (IOException e) {
logErrorMessage(logFile, e);
- IOUtils.cleanup(LOG, in);
+ IOUtils.closeQuietly(in);
continue;
}
@@ -287,7 +285,7 @@ public class AggregatedLogFormat {
String message = logErrorMessage(logFile, e);
out.write(message.getBytes(Charset.forName("UTF-8")));
} finally {
- IOUtils.cleanup(LOG, in);
+ IOUtils.closeQuietly(in);
}
}
}
@@ -557,7 +555,7 @@ public class AggregatedLogFormat {
} catch (Exception e) {
LOG.warn("Exception closing writer", e);
} finally {
- IOUtils.closeStream(this.fsDataOStream);
+ IOUtils.closeQuietly(this.fsDataOStream);
}
}
}
@@ -605,7 +603,7 @@ public class AggregatedLogFormat {
}
return null;
} finally {
- IOUtils.cleanup(LOG, ownerScanner);
+ IOUtils.closeQuietly(ownerScanner);
}
}
@@ -651,7 +649,7 @@ public class AggregatedLogFormat {
}
return acls;
} finally {
- IOUtils.cleanup(LOG, aclScanner);
+ IOUtils.closeQuietly(aclScanner);
}
}
@@ -775,8 +773,8 @@ public class AggregatedLogFormat {
}
}
} finally {
- IOUtils.cleanup(LOG, ps);
- IOUtils.cleanup(LOG, os);
+ IOUtils.closeQuietly(ps);
+ IOUtils.closeQuietly(os);
}
}
@@ -1001,7 +999,9 @@ public class AggregatedLogFormat {
}
public void close() {
- IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
+ IOUtils.closeQuietly(scanner);
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(fsDataIStream);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 24baaab..e8a28de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -133,6 +133,23 @@ public class LogAggregationUtils {
new org.apache.hadoop.fs.Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ return getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix);
+ }
+
+ /**
+ * Return the remote application log directory.
+ * @param conf the configuration
+ * @param appId the application
+ * @param appOwner the application owner
+ * @param remoteRootLogDir the remote root log directory
+ * @param suffix the log directory suffix
+ * @return the remote application log directory path
+ * @throws IOException if we can not find remote application log directory
+ */
+ public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
+ Configuration conf, ApplicationId appId, String appOwner,
+ org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+ throws IOException {
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
@@ -159,6 +176,30 @@ public class LogAggregationUtils {
* @param conf the configuration
* @param appId the applicationId
* @param appOwner the application owner
+ * @param remoteRootLogDir the remote root log directory
+ * @param suffix the log directory suffix
+ * @return the iterator of available log files
+ * @throws IOException if there is no log file available
+ */
+ public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
+ Configuration conf, ApplicationId appId, String appOwner,
+ org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+ throws IOException {
+ Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
+ remoteRootLogDir, suffix);
+ RemoteIterator<FileStatus> nodeFiles = null;
+ Path qualifiedLogDir =
+ FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
+ nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
+ conf).listStatus(remoteAppLogDir);
+ return nodeFiles;
+ }
+
+ /**
+ * Get all available log files under remote app log directory.
+ * @param conf the configuration
+ * @param appId the applicationId
+ * @param appOwner the application owner
* @return the iterator of available log files
* @throws IOException if there is no log file available
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
new file mode 100644
index 0000000..5503f8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+
+/**
+ * Base class to implement Log Aggregation File Controller.
+ */
+@Private
+@Unstable
+public abstract class LogAggregationFileController {
+
+ private static final Log LOG = LogFactory.getLog(
+ LogAggregationFileController.class);
+
+ /*
+ * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
+ * Group to which NMOwner belongs> App dirs will be created as 770,
+ * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
+ * access / modify the files.
+ * <NMGroup> should obviously be a limited access group.
+ */
+ /**
+ * Permissions for the top level directory under which app directories will be
+ * created.
+ */
+ protected static final FsPermission TLDIR_PERMISSIONS = FsPermission
+ .createImmutable((short) 01777);
+
+ /**
+ * Permissions for the Application directory.
+ */
+ protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
+ .createImmutable((short) 0770);
+
+ // This is temporary solution. The configuration will be deleted once
+ // we find a more scalable method to only write a single log file per LRS.
+ private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
+ = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
+ private static final int
+ DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
+
+ protected Configuration conf;
+ protected Path remoteRootLogDir;
+ protected String remoteRootLogDirSuffix;
+ protected int retentionSize;
+ protected String fileControllerName;
+
+ public LogAggregationFileController() {}
+
+ /**
+ * Initialize the log file controller.
+ * @param conf the Configuration
+ * @param controllerName the log controller class name
+ */
+ public void initialize(Configuration conf, String controllerName) {
+ this.conf = conf;
+ int configuredRentionSize =
+ conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
+ DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
+ if (configuredRentionSize <= 0) {
+ this.retentionSize =
+ DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
+ } else {
+ this.retentionSize = configuredRentionSize;
+ }
+ this.fileControllerName = controllerName;
+ initInternal(conf);
+ }
+
+ /**
+ * Derived classes initialize themselves using this method.
+ * @param conf the Configuration
+ */
+ protected abstract void initInternal(Configuration conf);
+
+ /**
+ * Get the remote root log directory.
+ * @return the remote root log directory path
+ */
+ public Path getRemoteRootLogDir() {
+ return this.remoteRootLogDir;
+ }
+
+ /**
+ * Get the log aggregation directory suffix.
+ * @return the log aggregation directory suffix
+ */
+ public String getRemoteRootLogDirSuffix() {
+ return this.remoteRootLogDirSuffix;
+ }
+
+ /**
+ * Initialize the writer.
+ * @param context the {@link LogAggregationFileControllerContext}
+ * @throws IOException if fails to initialize the writer
+ */
+ public abstract void initializeWriter(
+ LogAggregationFileControllerContext context) throws IOException;
+
+ /**
+ * Close the writer.
+ */
+ public abstract void closeWriter();
+
+ /**
+ * Write the log content.
+ * @param logKey the log key
+ * @param logValue the log content
+ * @throws IOException if fails to write the logs
+ */
+ public abstract void write(LogKey logKey, LogValue logValue)
+ throws IOException;
+
+ /**
+ * Operations needed after write the log content.
+ * @param record the {@link LogAggregationFileControllerContext}
+ * @throws Exception if anything fails
+ */
+ public abstract void postWrite(LogAggregationFileControllerContext record)
+ throws Exception;
+
+ /**
+ * Verify and create the remote log directory.
+ */
+ public void verifyAndCreateRemoteLogDir() {
+ boolean logPermError = true;
+ // Checking the existence of the TLD
+ FileSystem remoteFS = null;
+ try {
+ remoteFS = getFileSystem(conf);
+ } catch (IOException e) {
+ throw new YarnRuntimeException(
+ "Unable to get Remote FileSystem instance", e);
+ }
+ boolean remoteExists = true;
+ Path remoteRootLogDir = getRemoteRootLogDir();
+ try {
+ FsPermission perms =
+ remoteFS.getFileStatus(remoteRootLogDir).getPermission();
+ if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
+ LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ + "] already exist, but with incorrect permissions. "
+ + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ + "]." + " The cluster may have problems with multiple users.");
+ logPermError = false;
+ } else {
+ logPermError = true;
+ }
+ } catch (FileNotFoundException e) {
+ remoteExists = false;
+ } catch (IOException e) {
+ throw new YarnRuntimeException(
+ "Failed to check permissions for dir ["
+ + remoteRootLogDir + "]", e);
+ }
+ if (!remoteExists) {
+ LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ + "] does not exist. Attempting to create it.");
+ try {
+ Path qualified =
+ remoteRootLogDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+ remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
+ remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
+
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ String primaryGroupName = null;
+ try {
+ primaryGroupName = loginUser.getPrimaryGroupName();
+ } catch (IOException e) {
+ LOG.warn("No primary group found. The remote root log directory" +
+ " will be created with the HDFS superuser being its group " +
+ "owner. JobHistoryServer may be unable to read the directory.");
+ }
+ // set owner on the remote directory only if the primary group exists
+ if (primaryGroupName != null) {
+ remoteFS.setOwner(qualified,
+ loginUser.getShortUserName(), primaryGroupName);
+ }
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Failed to create remoteLogDir ["
+ + remoteRootLogDir + "]", e);
+ }
+ }
+ }
+
+ /**
+ * Create remote Application directory for log aggregation.
+ * @param user the user
+ * @param appId the application ID
+ * @param userUgi the UGI
+ */
+ public void createAppDir(final String user, final ApplicationId appId,
+ UserGroupInformation userUgi) {
+ final Path remoteRootLogDir = getRemoteRootLogDir();
+ final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix();
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ try {
+ // TODO: Reuse FS for user?
+ FileSystem remoteFS = getFileSystem(conf);
+
+ // Only creating directories if they are missing to avoid
+ // unnecessary load on the filesystem from all of the nodes
+ Path appDir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
+
+ appDir = appDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
+ Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix);
+ suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
+ Path userDir = LogAggregationUtils.getRemoteLogUserDir(
+ remoteRootLogDir, user);
+ userDir = userDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
+ createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+ }
+
+ } catch (IOException e) {
+ LOG.error("Failed to setup application log directory for "
+ + appId, e);
+ throw e;
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ protected FileSystem getFileSystem(Configuration conf) throws IOException {
+ return getRemoteRootLogDir().getFileSystem(conf);
+ }
+
+ protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
+ throws IOException {
+ FsPermission dirPerm = new FsPermission(fsPerm);
+ fs.mkdirs(path, dirPerm);
+ FsPermission umask = FsPermission.getUMask(fs.getConf());
+ if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
+ fs.setPermission(path, new FsPermission(fsPerm));
+ }
+ }
+
+ protected boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
+ throws IOException {
+ boolean exists = true;
+ try {
+ FileStatus appDirStatus = fs.getFileStatus(path);
+ if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
+ fs.setPermission(path, APP_DIR_PERMISSIONS);
+ }
+ } catch (FileNotFoundException fnfe) {
+ exists = false;
+ }
+ return exists;
+ }
+
+ /**
+ * Get the remote aggregated log path.
+ * @param appId the ApplicationId
+ * @param user the Application Owner
+ * @param nodeId the NodeManager Id
+ * @return the remote aggregated log path
+ */
+ public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
+ NodeId nodeId) {
+ return LogAggregationUtils.getRemoteNodeLogFileForApp(
+ getRemoteRootLogDir(), appId, user, nodeId,
+ getRemoteRootLogDirSuffix());
+ }
+
+ /**
+ * Get the remote application directory for log aggregation.
+ * @param appId the Application ID
+ * @param appOwner the Application Owner
+ * @return the remote application directory
+ * @throws IOException if can not find the remote application directory
+ */
+ public Path getRemoteAppLogDir(ApplicationId appId, String appOwner)
+ throws IOException {
+ return LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner,
+ this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+ }
+
+ protected void cleanOldLogs(Path remoteNodeLogFileForApp,
+ final NodeId nodeId, UserGroupInformation userUgi) {
+ try {
+ final FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
+ Path appDir = remoteNodeLogFileForApp.getParent().makeQualified(
+ remoteFS.getUri(), remoteFS.getWorkingDirectory());
+ Set<FileStatus> status =
+ new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
+
+ Iterable<FileStatus> mask =
+ Iterables.filter(status, new Predicate<FileStatus>() {
+ @Override
+ public boolean apply(FileStatus next) {
+ return next.getPath().getName()
+ .contains(LogAggregationUtils.getNodeString(nodeId))
+ && !next.getPath().getName().endsWith(
+ LogAggregationUtils.TMP_FILE_SUFFIX);
+ }
+ });
+ status = Sets.newHashSet(mask);
+ // Normally, we just need to delete one oldest log
+ // before we upload a new log.
+ // If we can not delete the older logs in this cycle,
+ // we will delete them in next cycle.
+ if (status.size() >= this.retentionSize) {
+ // sort by the lastModificationTime ascending
+ List<FileStatus> statusList = new ArrayList<FileStatus>(status);
+ Collections.sort(statusList, new Comparator<FileStatus>() {
+ public int compare(FileStatus s1, FileStatus s2) {
+ return s1.getModificationTime() < s2.getModificationTime() ? -1
+ : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
+ }
+ });
+ for (int i = 0; i <= statusList.size() - this.retentionSize; i++) {
+ final FileStatus remove = statusList.get(i);
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ remoteFS.delete(remove.getPath(), false);
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Failed to delete " + remove.getPath(), e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to clean old logs", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
new file mode 100644
index 0000000..32128bc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
@@ -0,0 +1,130 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * {@code LogAggregationFileControllerContext} is a record used in
+ * the log aggregation process.
+ */
+@Private
+@Unstable
+public class LogAggregationFileControllerContext {
+ private final boolean logAggregationInRolling;
+ private final long rollingMonitorInterval;
+ private final Path remoteNodeLogFileForApp;
+ private final NodeId nodeId;
+ private final UserGroupInformation userUgi;
+ private final ApplicationId appId;
+ private final Path remoteNodeTmpLogFileForApp;
+ private final Map<ApplicationAccessType, String> appAcls;
+ private int logAggregationTimes = 0;
+ private int cleanOldLogsTimes = 0;
+
+ private boolean uploadedLogsInThisCycle;
+ private long logUploadedTimeStamp;
+
+ public LogAggregationFileControllerContext(Path remoteNodeLogFileForApp,
+ Path remoteNodeTmpLogFileForApp,
+ boolean logAggregationInRolling,
+ long rollingMonitorInterval,
+ ApplicationId appId,
+ Map<ApplicationAccessType, String> appAcls,
+ NodeId nodeId, UserGroupInformation userUgi) {
+ this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+ this.remoteNodeTmpLogFileForApp = remoteNodeTmpLogFileForApp;
+ this.logAggregationInRolling = logAggregationInRolling;
+ this.rollingMonitorInterval = rollingMonitorInterval;
+ this.nodeId = nodeId;
+ this.appId = appId;
+ this.appAcls = appAcls;
+ this.userUgi = userUgi;
+ }
+
+ public boolean isUploadedLogsInThisCycle() {
+ return uploadedLogsInThisCycle;
+ }
+
+ public void setUploadedLogsInThisCycle(boolean uploadedLogsInThisCycle) {
+ this.uploadedLogsInThisCycle = uploadedLogsInThisCycle;
+ }
+
+ public Path getRemoteNodeLogFileForApp() {
+ return remoteNodeLogFileForApp;
+ }
+
+ public long getRollingMonitorInterval() {
+ return rollingMonitorInterval;
+ }
+
+ public boolean isLogAggregationInRolling() {
+ return logAggregationInRolling;
+ }
+
+ public long getLogUploadTimeStamp() {
+ return logUploadedTimeStamp;
+ }
+
+ public void setLogUploadTimeStamp(long uploadTimeStamp) {
+ this.logUploadedTimeStamp = uploadTimeStamp;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public UserGroupInformation getUserUgi() {
+ return userUgi;
+ }
+
+ public ApplicationId getAppId() {
+ return appId;
+ }
+
+ public Path getRemoteNodeTmpLogFileForApp() {
+ return remoteNodeTmpLogFileForApp;
+ }
+
+ public void increLogAggregationTimes() {
+ this.logAggregationTimes++;
+ }
+
+ public void increcleanupOldLogTimes() {
+ this.cleanOldLogsTimes++;
+ }
+
+ public int getLogAggregationTimes() {
+ return logAggregationTimes;
+ }
+
+ public int getCleanOldLogsTimes() {
+ return cleanOldLogsTimes;
+ }
+
+ public Map<ApplicationAccessType, String> getAppAcls() {
+ return appAcls;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
new file mode 100644
index 0000000..746bf5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Use {@code LogAggregationFileControllerFactory} to get the correct
+ * {@link LogAggregationFileController} for write and read.
+ *
+ */
+@Private
+@Unstable
+public class LogAggregationFileControllerFactory {
+
+ private static final Log LOG = LogFactory.getLog(
+ LogAggregationFileControllerFactory.class);
+ private final Pattern p = Pattern.compile(
+ "^[A-Za-z_]+[A-Za-z0-9_]*$");
+ private LinkedList<LogAggregationFileController> controllers
+ = new LinkedList<>();
+ private Configuration conf;
+
+ /**
+ * Construct the LogAggregationFileControllerFactory object.
+ * @param conf the Configuration
+ */
+ public LogAggregationFileControllerFactory(Configuration conf) {
+ this.conf = conf;
+ Collection<String> fileControllers = conf.getStringCollection(
+ YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
+ List<String> controllerClassName = new ArrayList<>();
+
+ Map<String, String> controllerChecker = new HashMap<>();
+
+ for (String fileController : fileControllers) {
+ Preconditions.checkArgument(validateAggregatedFileControllerName(
+ fileController), "The FileControllerName: " + fileController
+ + " set in " + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS
+ +" is invalid." + "The valid File Controller name should only "
+ + "contain a-zA-Z0-9_ and can not start with numbers");
+
+ String remoteDirStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+ fileController);
+ String remoteDir = conf.get(remoteDirStr);
+ boolean defaultRemoteDir = false;
+ if (remoteDir == null || remoteDir.isEmpty()) {
+ remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
+ defaultRemoteDir = true;
+ }
+ String suffixStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+ fileController);
+ String suffix = conf.get(suffixStr);
+ boolean defaultSuffix = false;
+ if (suffix == null || suffix.isEmpty()) {
+ suffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ defaultSuffix = true;
+ }
+ String dirSuffix = remoteDir + "-" + suffix;
+ if (controllerChecker.containsKey(dirSuffix)) {
+ if (defaultRemoteDir && defaultSuffix) {
+ String fileControllerStr = controllerChecker.get(dirSuffix);
+ List<String> controllersList = new ArrayList<>();
+ controllersList.add(fileControllerStr);
+ controllersList.add(fileController);
+ fileControllerStr = StringUtils.join(controllersList, ",");
+ controllerChecker.put(dirSuffix, fileControllerStr);
+ } else {
+ String conflictController = controllerChecker.get(dirSuffix);
+ throw new RuntimeException("The combined value of " + remoteDirStr
+ + " and " + suffixStr + " should not be the same as the value"
+ + " set for " + conflictController);
+ }
+ } else {
+ controllerChecker.put(dirSuffix, fileController);
+ }
+ String classKey = String.format(
+ YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT,
+ fileController);
+ String className = conf.get(classKey);
+ if (className == null || className.isEmpty()) {
+ throw new RuntimeException("No class configured for "
+ + fileController);
+ }
+ controllerClassName.add(className);
+ Class<? extends LogAggregationFileController> sClass = conf.getClass(
+ classKey, null, LogAggregationFileController.class);
+ if (sClass == null) {
+ throw new RuntimeException("No class defined for " + fileController);
+ }
+ LogAggregationFileController s = ReflectionUtils.newInstance(
+ sClass, conf);
+ if (s == null) {
+ throw new RuntimeException("No object created for "
+ + controllerClassName);
+ }
+ s.initialize(conf, fileController);
+ controllers.add(s);
+ }
+ }
+
+ /**
+ * Get {@link LogAggregationFileController} to write.
+ * @return the LogAggregationFileController instance
+ */
+ public LogAggregationFileController getFileControllerForWrite() {
+ return controllers.getFirst();
+ }
+
+ /**
+ * Get {@link LogAggregationFileController} to read the aggregated logs
+ * for this application.
+ * @param appId the ApplicationId
+ * @param appOwner the Application Owner
+ * @return the LogAggregationFileController instance
+ * @throws IOException if can not find any log aggregation file controller
+ */
+ public LogAggregationFileController getFileControllerForRead(
+ ApplicationId appId, String appOwner) throws IOException {
+ StringBuilder diagnosis = new StringBuilder();
+ for(LogAggregationFileController fileController : controllers) {
+ try {
+ Path remoteAppLogDir = fileController.getRemoteAppLogDir(
+ appId, appOwner);
+ Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(
+ remoteAppLogDir);
+ RemoteIterator<FileStatus> nodeFiles = FileContext.getFileContext(
+ qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
+ if (nodeFiles.hasNext()) {
+ return fileController;
+ }
+ } catch (Exception ex) {
+ diagnosis.append(ex.getMessage() + "\n");
+ continue;
+ }
+ }
+ throw new IOException(diagnosis.toString());
+ }
+
+ private boolean validateAggregatedFileControllerName(String name) {
+ if (name == null || name.trim().isEmpty()) {
+ return false;
+ }
+ return p.matcher(name).matches();
+ }
+
+ @Private
+ @VisibleForTesting
+ public LinkedList<LogAggregationFileController>
+ getConfiguredLogAggregationFileControllerList() {
+ return this.controllers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
new file mode 100644
index 0000000..9e0c66d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.Times;
+
+/**
+ * The TFile log aggregation file Controller implementation.
+ */
+@Private
+@Unstable
+public class LogAggregationTFileController
+ extends LogAggregationFileController {
+
+ private static final Log LOG = LogFactory.getLog(
+ LogAggregationTFileController.class);
+
+ private LogWriter writer;
+
+ public LogAggregationTFileController(){}
+
+ @Override
+ public void initInternal(Configuration conf) {
+ this.remoteRootLogDir = new Path(
+ conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ this.remoteRootLogDirSuffix =
+ conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ }
+
+ @Override
+ public void initializeWriter(LogAggregationFileControllerContext context)
+ throws IOException {
+ this.writer = new LogWriter();
+ writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
+ context.getUserUgi());
+ // Write ACLs once when the writer is created.
+ writer.writeApplicationACLs(context.getAppAcls());
+ writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
+ }
+
+ @Override
+ public void closeWriter() {
+ this.writer.close();
+ }
+
+ @Override
+ public void write(LogKey logKey, LogValue logValue) throws IOException {
+ this.writer.append(logKey, logValue);
+ }
+
+ @Override
+ public void postWrite(final LogAggregationFileControllerContext record)
+ throws Exception {
+ // Before upload logs, make sure the number of existing logs
+ // is smaller than the configured NM log aggregation retention size.
+ if (record.isUploadedLogsInThisCycle() &&
+ record.isLogAggregationInRolling()) {
+ cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
+ record.getUserUgi());
+ record.increcleanupOldLogTimes();
+ }
+
+ final Path renamedPath = record.getRollingMonitorInterval() <= 0
+ ? record.getRemoteNodeLogFileForApp() : new Path(
+ record.getRemoteNodeLogFileForApp().getParent(),
+ record.getRemoteNodeLogFileForApp().getName() + "_"
+ + record.getLogUploadTimeStamp());
+ final boolean rename = record.isUploadedLogsInThisCycle();
+ try {
+ record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
+ .getFileSystem(conf);
+ if (rename) {
+ remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
+ renamedPath);
+ } else {
+ remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to move temporary log file to final location: ["
+ + record.getRemoteNodeTmpLogFileForApp() + "] to ["
+ + renamedPath + "]", e);
+ throw new Exception("Log uploaded failed for Application: "
+ + record.getAppId() + " in NodeManager: "
+ + LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+ + Times.format(record.getLogUploadTimeStamp()) + "\n");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
new file mode 100644
index 0000000..cad238a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+import org.apache.hadoop.classification.InterfaceAudience;
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f93de44..0823dfe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1167,6 +1167,25 @@
</property>
<property>
+ <description>Specify which log file controllers we will support. The first
+ file controller we add will be used to write the aggregated logs.
+ This comma separated configuration will work with the configuration:
+ yarn.log-aggregation.file-controller.%s.class which defines the supported
+ file controller's class. By default, the TFile controller would be used.
+ The user could override this configuration by adding more file controllers.
+ To support back-ward compatibility, make sure that we always
+ add TFile file controller.</description>
+ <name>yarn.log-aggregation.file-formats</name>
+ <value>TFile</value>
+ </property>
+
+ <property>
+ <description>Class that supports TFile read and write operations.</description>
+ <name>yarn.log-aggregation.file-controller.TFile.class</name>
+ <value>org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController</value>
+ </property>
+
+ <property>
<description>
How long for ResourceManager to wait for NodeManager to report its
log aggregation status. If waiting time of which the log aggregation
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
index 1e71b3c..3dd7de3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
@@ -40,10 +40,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
import org.apache.hadoop.yarn.webapp.view.BlockForTest;
@@ -249,7 +253,7 @@ public class TestAggregatedLogsBlock {
private Configuration getConfiguration() {
- Configuration configuration = new Configuration();
+ Configuration configuration = new YarnConfiguration();
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/logs");
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
@@ -295,19 +299,25 @@ public class TestAggregatedLogsBlock {
List<String> rootLogDirs = Arrays.asList("target/logs/logs");
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- try (AggregatedLogFormat.LogWriter writer =
- new AggregatedLogFormat.LogWriter()) {
- writer.initialize(configuration, new Path(path), ugi);
- writer.writeApplicationOwner(ugi.getUserName());
-
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(configuration);
+ LogAggregationFileController fileController = factory
+ .getFileControllerForWrite();
+ try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
- writer.writeApplicationACLs(appAcls);
-
- writer.append(
+ NodeId nodeId = NodeId.newInstance("localhost", 1234);
+ LogAggregationFileControllerContext context
+ = new LogAggregationFileControllerContext(
+ new Path(path), new Path(path), false, 3600,
+ appId, appAcls, nodeId, ugi);
+ fileController.initializeWriter(context);
+ fileController.write(
new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
+ } finally {
+ fileController.closeWriter();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index 8b665e0..a12e2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -24,15 +24,21 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
/**
* This class contains several utility functions for log aggregation tests.
@@ -110,14 +116,25 @@ public final class TestContainerLogsUtils {
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
- try (AggregatedLogFormat.LogWriter writer =
- new AggregatedLogFormat.LogWriter()) {
- writer.initialize(configuration, path, ugi);
- writer.writeApplicationOwner(ugi.getUserName());
-
- writer.append(new AggregatedLogFormat.LogKey(containerId),
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(configuration);
+ LogAggregationFileController fileController = factory
+ .getFileControllerForWrite();
+ try {
+ Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+ appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ LogAggregationFileControllerContext context
+ = new LogAggregationFileControllerContext(
+ path, path, true, 1000,
+ appId, appAcls, nodeId, ugi);
+ fileController.initializeWriter(context);
+ fileController.write(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
+ } finally {
+ fileController.closeWriter();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
new file mode 100644
index 0000000..45f18c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.LinkedList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
+import org.junit.Test;
+
+/**
+ * Test LogAggregationFileControllerFactory.
+ *
+ */
+public class TestLogAggregationFileControllerFactory {
+
+ @Test(timeout = 10000)
+ public void testLogAggregationFileControllerFactory() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ String appOwner = "test";
+ String remoteLogRootDir = "target/app-logs/";
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
+ FileSystem fs = FileSystem.get(conf);
+
+ LogAggregationFileControllerFactory factory =
+ new LogAggregationFileControllerFactory(conf);
+ LinkedList<LogAggregationFileController> list = factory
+ .getConfiguredLogAggregationFileControllerList();
+ assertTrue(list.size() == 1);
+ assertTrue(list.getFirst() instanceof LogAggregationTFileController);
+ assertTrue(factory.getFileControllerForWrite()
+ instanceof LogAggregationTFileController);
+ Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
+ try {
+ if (fs.exists(logPath)) {
+ fs.delete(logPath, true);
+ }
+ assertTrue(fs.mkdirs(logPath));
+ Writer writer =
+ new FileWriter(new File(logPath.toString(), "testLog"));
+ writer.write("test");
+ writer.close();
+ assertTrue(factory.getFileControllerForRead(appId, appOwner)
+ instanceof LogAggregationTFileController);
+ } finally {
+ fs.delete(logPath, true);
+ }
+
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+ "TestLogAggregationFileController");
+ // Did not set class for TestLogAggregationFileController,
+ // should get the exception.
+ try {
+ factory =
+ new LogAggregationFileControllerFactory(conf);
+ fail();
+ } catch (Exception ex) {
+ // should get exception
+ }
+
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+ "TestLogAggregationFileController,TFile");
+ conf.setClass(
+ "yarn.log-aggregation.file-controller.TestLogAggregationFileController"
+ + ".class", TestLogAggregationFileController.class,
+ LogAggregationFileController.class);
+
+ conf.set(
+ "yarn.log-aggregation.TestLogAggregationFileController"
+ + ".remote-app-log-dir", remoteLogRootDir);
+ conf.set(
+ "yarn.log-aggregation.TestLogAggregationFileController"
+ + ".remote-app-log-dir-suffix", "testLog");
+
+ factory = new LogAggregationFileControllerFactory(conf);
+ list = factory.getConfiguredLogAggregationFileControllerList();
+ assertTrue(list.size() == 2);
+ assertTrue(list.getFirst() instanceof TestLogAggregationFileController);
+ assertTrue(list.getLast() instanceof LogAggregationTFileController);
+ assertTrue(factory.getFileControllerForWrite()
+ instanceof TestLogAggregationFileController);
+
+ logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
+ try {
+ if (fs.exists(logPath)) {
+ fs.delete(logPath, true);
+ }
+ assertTrue(fs.mkdirs(logPath));
+ Writer writer =
+ new FileWriter(new File(logPath.toString(), "testLog"));
+ writer.write("test");
+ writer.close();
+ assertTrue(factory.getFileControllerForRead(appId, appOwner)
+ instanceof TestLogAggregationFileController);
+ } finally {
+ fs.delete(logPath, true);
+ }
+ }
+
+ private static class TestLogAggregationFileController
+ extends LogAggregationFileController {
+
+ @Override
+ public void initInternal(Configuration conf) {
+ String remoteDirStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+ this.fileControllerName);
+ this.remoteRootLogDir = new Path(conf.get(remoteDirStr));
+ String suffix = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+ this.fileControllerName);
+ this.remoteRootLogDirSuffix = conf.get(suffix);
+ }
+
+ @Override
+ public void closeWriter() {
+ // Do Nothing
+ }
+
+ @Override
+ public void write(LogKey logKey, LogValue logValue) throws IOException {
+ // Do Nothing
+ }
+
+ @Override
+ public void postWrite(LogAggregationFileControllerContext record)
+ throws Exception {
+ // Do Nothing
+ }
+
+ @Override
+ public void initializeWriter(LogAggregationFileControllerContext context)
+ throws IOException {
+ // Do Nothing
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org