You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/08/24 20:35:07 UTC
[1/2] hadoop git commit: YARN-6876. Create an abstract log writer for
extendability. Contributed by Xuan Gong.
Repository: hadoop
Updated Branches:
refs/heads/trunk 8196a07c3 -> c2cb7ea1e
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 1601c3f..51c63c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -19,11 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -38,8 +34,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
@@ -57,7 +51,9 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
@@ -71,7 +67,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
@@ -86,18 +81,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Logger LOG =
LoggerFactory.getLogger(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
- // This is temporary solution. The configuration will be deleted once
- // we find a more scalable method to only write a single log file per LRS.
- private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
- = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
- private static final int
- DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
-
- // This configuration is for debug and test purpose. By setting
- // this configuration as true. We can break the lower bound of
- // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
- private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
- = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
@@ -118,10 +101,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final FileContext lfs;
private final LogAggregationContext logAggregationContext;
private final Context context;
- private final int retentionSize;
- private final long rollingMonitorInterval;
- private final boolean logAggregationInRolling;
private final NodeId nodeId;
+ private final LogAggregationFileControllerContext logControllerContext;
// These variables are only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
@@ -134,6 +115,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
new HashMap<ContainerId, ContainerLogAggregator>();
private final ContainerLogAggregationPolicy logAggPolicy;
+ private final LogAggregationFileController logAggregationFileController;
+
/**
* The value recovered from state store to determine the age of application
@@ -151,7 +134,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
FileContext lfs, long rollingMonitorInterval) {
this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
- logAggregationContext, context, lfs, rollingMonitorInterval, -1);
+ logAggregationContext, context, lfs, rollingMonitorInterval, -1, null);
}
public AppLogAggregatorImpl(Dispatcher dispatcher,
@@ -162,6 +145,21 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval,
long recoveredLogInitedTime) {
+ this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
+ dirsHandler, remoteNodeLogFileForApp, appAcls,
+ logAggregationContext, context, lfs, rollingMonitorInterval,
+ recoveredLogInitedTime, null);
+ }
+
+ public AppLogAggregatorImpl(Dispatcher dispatcher,
+ DeletionService deletionService, Configuration conf,
+ ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
+ LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
+ Map<ApplicationAccessType, String> appAcls,
+ LogAggregationContext logAggregationContext, Context context,
+ FileContext lfs, long rollingMonitorInterval,
+ long recoveredLogInitedTime,
+ LogAggregationFileController logAggregationFileController) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
@@ -169,31 +167,41 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.applicationId = appId.toString();
this.userUgi = userUgi;
this.dirsHandler = dirsHandler;
- this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
- this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
this.appAcls = appAcls;
this.lfs = lfs;
this.logAggregationContext = logAggregationContext;
this.context = context;
this.nodeId = nodeId;
- int configuredRentionSize =
- conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
- DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
- if (configuredRentionSize <= 0) {
- this.retentionSize =
- DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
+ this.logAggPolicy = getLogAggPolicy(conf);
+ this.recoveredLogInitedTime = recoveredLogInitedTime;
+ if (logAggregationFileController == null) {
+ // by default, use T-File Controller
+ this.logAggregationFileController = new LogAggregationTFileController();
+ this.logAggregationFileController.initialize(conf, "TFile");
+ this.logAggregationFileController.verifyAndCreateRemoteLogDir();
+ this.logAggregationFileController.createAppDir(
+ this.userUgi.getShortUserName(), appId, userUgi);
+ this.remoteNodeLogFileForApp = this.logAggregationFileController
+ .getRemoteNodeLogFileForApp(appId,
+ this.userUgi.getShortUserName(), nodeId);
+ this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
} else {
- this.retentionSize = configuredRentionSize;
+ this.logAggregationFileController = logAggregationFileController;
+ this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+ this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
}
- this.rollingMonitorInterval = rollingMonitorInterval;
- this.logAggregationInRolling =
- this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
+ boolean logAggregationInRolling =
+ rollingMonitorInterval <= 0 || this.logAggregationContext == null
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|| this.logAggregationContext.getRolledLogsIncludePattern()
- .isEmpty() ? false : true;
- this.logAggPolicy = getLogAggPolicy(conf);
- this.recoveredLogInitedTime = recoveredLogInitedTime;
+ .isEmpty() ? false : true;
+ logControllerContext = new LogAggregationFileControllerContext(
+ this.remoteNodeLogFileForApp,
+ this.remoteNodeTmpLogFileForApp,
+ logAggregationInRolling,
+ rollingMonitorInterval,
+ this.appId, this.appAcls, this.nodeId, this.userUgi);
}
private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
@@ -293,14 +301,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
- try (LogWriter writer = createLogWriter()) {
+ try {
try {
- writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
- this.userUgi);
- // Write ACLs once when the writer is created.
- writer.writeApplicationACLs(appAcls);
- writer.writeApplicationOwner(this.userUgi.getShortUserName());
-
+ logAggregationFileController.initializeWriter(logControllerContext);
} catch (IOException e1) {
logAggregationSucceedInThisCycle = false;
LOG.error("Cannot create writer for app " + this.applicationId
@@ -318,8 +321,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
containerLogAggregators.put(container, aggregator);
}
Set<Path> uploadedFilePathsInThisCycle =
- aggregator.doContainerLogAggregation(writer, appFinished,
- finishedContainers.contains(container));
+ aggregator.doContainerLogAggregation(logAggregationFileController,
+ appFinished, finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
@@ -337,60 +340,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
}
- // Before upload logs, make sure the number of existing logs
- // is smaller than the configured NM log aggregation retention size.
- if (uploadedLogsInThisCycle && logAggregationInRolling) {
- cleanOldLogs();
- cleanupOldLogTimes++;
- }
-
- long currentTime = System.currentTimeMillis();
- final Path renamedPath = getRenamedPath(currentTime);
-
- final boolean rename = uploadedLogsInThisCycle;
+ logControllerContext.setUploadedLogsInThisCycle(uploadedLogsInThisCycle);
+ logControllerContext.setLogUploadTimeStamp(System.currentTimeMillis());
+ logControllerContext.increLogAggregationTimes();
try {
- userUgi.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
- if (rename) {
- remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
- } else {
- remoteFS.delete(remoteNodeTmpLogFileForApp, false);
- }
- return null;
- }
- });
- diagnosticMessage =
- "Log uploaded successfully for Application: " + appId
- + " in NodeManager: "
- + LogAggregationUtils.getNodeString(nodeId) + " at "
- + Times.format(currentTime) + "\n";
+ this.logAggregationFileController.postWrite(logControllerContext);
+ diagnosticMessage = "Log uploaded successfully for Application: "
+ + appId + " in NodeManager: "
+ + LogAggregationUtils.getNodeString(nodeId) + " at "
+ + Times.format(logControllerContext.getLogUploadTimeStamp())
+ + "\n";
} catch (Exception e) {
- LOG.error(
- "Failed to move temporary log file to final location: ["
- + remoteNodeTmpLogFileForApp + "] to ["
- + renamedPath + "]", e);
- diagnosticMessage =
- "Log uploaded failed for Application: " + appId
- + " in NodeManager: "
- + LogAggregationUtils.getNodeString(nodeId) + " at "
- + Times.format(currentTime) + "\n";
+ diagnosticMessage = e.getMessage();
renameTemporaryLogFileFailed = true;
logAggregationSucceedInThisCycle = false;
}
} finally {
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
+ logAggregationFileController.closeWriter();
}
}
- private Path getRenamedPath(long currentTime) {
- return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp
- : new Path(remoteNodeLogFileForApp.getParent(),
- remoteNodeLogFileForApp.getName() + "_" + currentTime);
- }
-
private void addCredentials() {
if (UserGroupInformation.isSecurityEnabled()) {
Credentials systemCredentials =
@@ -407,11 +378,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
}
- @VisibleForTesting
- protected LogWriter createLogWriter() {
- return new LogWriter();
- }
-
private void sendLogAggregationReport(
boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
boolean appFinished) {
@@ -442,60 +408,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.context.getLogAggregationStatusForApps().add(report);
}
- private void cleanOldLogs() {
- try {
- final FileSystem remoteFS =
- this.remoteNodeLogFileForApp.getFileSystem(conf);
- Path appDir =
- this.remoteNodeLogFileForApp.getParent().makeQualified(
- remoteFS.getUri(), remoteFS.getWorkingDirectory());
- Set<FileStatus> status =
- new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
-
- Iterable<FileStatus> mask =
- Iterables.filter(status, new Predicate<FileStatus>() {
- @Override
- public boolean apply(FileStatus next) {
- return next.getPath().getName()
- .contains(LogAggregationUtils.getNodeString(nodeId))
- && !next.getPath().getName().endsWith(
- LogAggregationUtils.TMP_FILE_SUFFIX);
- }
- });
- status = Sets.newHashSet(mask);
- // Normally, we just need to delete one oldest log
- // before we upload a new log.
- // If we can not delete the older logs in this cycle,
- // we will delete them in next cycle.
- if (status.size() >= this.retentionSize) {
- // sort by the lastModificationTime ascending
- List<FileStatus> statusList = new ArrayList<FileStatus>(status);
- Collections.sort(statusList, new Comparator<FileStatus>() {
- public int compare(FileStatus s1, FileStatus s2) {
- return s1.getModificationTime() < s2.getModificationTime() ? -1
- : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
- }
- });
- for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
- final FileStatus remove = statusList.get(i);
- try {
- userUgi.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- remoteFS.delete(remove.getPath(), false);
- return null;
- }
- });
- } catch (Exception e) {
- LOG.error("Failed to delete " + remove.getPath(), e);
- }
- }
- }
- } catch (Exception e) {
- LOG.error("Failed to clean old logs", e);
- }
- }
-
@SuppressWarnings("unchecked")
@Override
public void run() {
@@ -523,8 +435,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
synchronized(this) {
try {
waiting.set(true);
- if (logAggregationInRolling) {
- wait(this.rollingMonitorInterval * 1000);
+ if (logControllerContext.isLogAggregationInRolling()) {
+ wait(logControllerContext.getRollingMonitorInterval() * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
@@ -653,7 +565,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
recoveredLogInitedTime, logRetentionSecs * 1000);
}
- public Set<Path> doContainerLogAggregation(LogWriter writer,
+ public Set<Path> doContainerLogAggregation(
+ LogAggregationFileController logAggregationFileController,
boolean appFinished, boolean containerFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
@@ -665,7 +578,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.uploadedFileMeta, retentionContext, appFinished,
containerFinished);
try {
- writer.append(logKey, logValue);
+ logAggregationFileController.write(logKey, logValue);
} catch (Exception e) {
LOG.error("Couldn't upload logs for " + containerId
+ ". Skipping this container.", e);
@@ -708,4 +621,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
int getCleanupOldLogTimes() {
return this.cleanupOldLogTimes;
}
+
+ @VisibleForTesting
+ public LogAggregationFileController getLogAggregationFileController() {
+ return this.logAggregationFileController;
+ }
+
+ @VisibleForTesting
+ public LogAggregationFileControllerContext
+ getLogAggregationFileControllerContext() {
+ return this.logControllerContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index aafd7d8..1a59e45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -32,10 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
@@ -48,7 +43,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -79,36 +75,14 @@ public class LogAggregationService extends AbstractService implements
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private long rollingMonitorInterval;
- /*
- * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
- * Group to which NMOwner belongs> App dirs will be created as 770,
- * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
- * access / modify the files.
- * <NMGroup> should obviously be a limited access group.
- */
- /**
- * Permissions for the top level directory under which app directories will be
- * created.
- */
- private static final FsPermission TLDIR_PERMISSIONS = FsPermission
- .createImmutable((short) 01777);
- /**
- * Permissions for the Application directory.
- */
- private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
- .createImmutable((short) 0770);
-
private final Context context;
private final DeletionService deletionService;
private final Dispatcher dispatcher;
private LocalDirsHandlerService dirsHandler;
- Path remoteRootLogDir;
- String remoteRootLogDirSuffix;
private NodeId nodeId;
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
- private boolean logPermError = true;
@VisibleForTesting
ExecutorService threadPool;
@@ -125,12 +99,6 @@ public class LogAggregationService extends AbstractService implements
}
protected void serviceInit(Configuration conf) throws Exception {
- this.remoteRootLogDir =
- new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
- this.remoteRootLogDirSuffix =
- conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
int threadPoolSize = getAggregatorThreadPoolSize(conf);
this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
new ThreadFactoryBuilder()
@@ -218,158 +186,6 @@ public class LogAggregationService extends AbstractService implements
}
}
- protected FileSystem getFileSystem(Configuration conf) throws IOException {
- return this.remoteRootLogDir.getFileSystem(conf);
- }
-
- void verifyAndCreateRemoteLogDir(Configuration conf) {
- // Checking the existence of the TLD
- FileSystem remoteFS = null;
- try {
- remoteFS = getFileSystem(conf);
- } catch (IOException e) {
- throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e);
- }
- boolean remoteExists = true;
- try {
- FsPermission perms =
- remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
- if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
- LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
- + "] already exist, but with incorrect permissions. "
- + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
- + "]." + " The cluster may have problems with multiple users.");
- logPermError = false;
- } else {
- logPermError = true;
- }
- } catch (FileNotFoundException e) {
- remoteExists = false;
- } catch (IOException e) {
- throw new YarnRuntimeException(
- "Failed to check permissions for dir ["
- + this.remoteRootLogDir + "]", e);
- }
- if (!remoteExists) {
- LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
- + "] does not exist. Attempting to create it.");
- try {
- Path qualified =
- this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
- remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
- remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
-
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
- String primaryGroupName = null;
- try {
- primaryGroupName = loginUser.getPrimaryGroupName();
- } catch (IOException e) {
- LOG.warn("No primary group found. The remote root log directory" +
- " will be created with the HDFS superuser being its group " +
- "owner. JobHistoryServer may be unable to read the directory.");
- }
- // set owner on the remote directory only if the primary group exists
- if (primaryGroupName != null) {
- remoteFS.setOwner(qualified,
- loginUser.getShortUserName(), primaryGroupName);
- }
- } catch (IOException e) {
- throw new YarnRuntimeException("Failed to create remoteLogDir ["
- + this.remoteRootLogDir + "]", e);
- }
- }
- }
-
- Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
- return LogAggregationUtils.getRemoteNodeLogFileForApp(
- this.remoteRootLogDir, appId, user, this.nodeId,
- this.remoteRootLogDirSuffix);
- }
-
- Path getRemoteAppLogDir(ApplicationId appId, String user) {
- return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId,
- user, this.remoteRootLogDirSuffix);
- }
-
- private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
- throws IOException {
- FsPermission dirPerm = new FsPermission(fsPerm);
- fs.mkdirs(path, dirPerm);
- FsPermission umask = FsPermission.getUMask(fs.getConf());
- if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
- fs.setPermission(path, new FsPermission(fsPerm));
- }
- }
-
- private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
- throws IOException {
- boolean exists = true;
- try {
- FileStatus appDirStatus = fs.getFileStatus(path);
- if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
- fs.setPermission(path, APP_DIR_PERMISSIONS);
- }
- } catch (FileNotFoundException fnfe) {
- exists = false;
- }
- return exists;
- }
-
- protected void createAppDir(final String user, final ApplicationId appId,
- UserGroupInformation userUgi) {
- try {
- userUgi.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- try {
- // TODO: Reuse FS for user?
- FileSystem remoteFS = getFileSystem(getConfig());
-
- // Only creating directories if they are missing to avoid
- // unnecessary load on the filesystem from all of the nodes
- Path appDir = LogAggregationUtils.getRemoteAppLogDir(
- LogAggregationService.this.remoteRootLogDir, appId, user,
- LogAggregationService.this.remoteRootLogDirSuffix);
- appDir = appDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
-
- if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
- Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
- LogAggregationService.this.remoteRootLogDir, user,
- LogAggregationService.this.remoteRootLogDirSuffix);
- suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
-
- if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
- Path userDir = LogAggregationUtils.getRemoteLogUserDir(
- LogAggregationService.this.remoteRootLogDir, user);
- userDir = userDir.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory());
-
- if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
- createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
- }
-
- createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
- }
-
- createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
- }
-
- } catch (IOException e) {
- LOG.error("Failed to setup application log directory for "
- + appId, e);
- throw e;
- }
- return null;
- }
- });
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
- }
- }
-
@SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
@@ -377,7 +193,6 @@ public class LogAggregationService extends AbstractService implements
long recoveredLogInitedTime) {
ApplicationEvent eventResponse;
try {
- verifyAndCreateRemoteLogDir(getConfig());
initAppAggregator(appId, user, credentials, appAcls,
logAggregationContext, recoveredLogInitedTime);
eventResponse = new ApplicationEvent(appId,
@@ -410,14 +225,17 @@ public class LogAggregationService extends AbstractService implements
userUgi.addCredentials(credentials);
}
+ LogAggregationFileController logAggregationFileController
+ = getLogAggregationFileController(getConfig());
+ logAggregationFileController.verifyAndCreateRemoteLogDir();
// New application
final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
- getRemoteNodeLogFileForApp(appId, user),
- appAcls, logAggregationContext, this.context,
+ logAggregationFileController.getRemoteNodeLogFileForApp(appId,
+ user, nodeId), appAcls, logAggregationContext, this.context,
getLocalFileContext(getConfig()), this.rollingMonitorInterval,
- recoveredLogInitedTime);
+ recoveredLogInitedTime, logAggregationFileController);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId);
}
@@ -425,7 +243,7 @@ public class LogAggregationService extends AbstractService implements
YarnRuntimeException appDirException = null;
try {
// Create the app dir
- createAppDir(user, appId, userUgi);
+ logAggregationFileController.createAppDir(user, appId, userUgi);
} catch (Exception e) {
appLogAggregator.disableLogAggregation();
if (!(e instanceof YarnRuntimeException)) {
@@ -570,4 +388,14 @@ public class LogAggregationService extends AbstractService implements
}
return threadPoolSize;
}
+
+ @VisibleForTesting
+ public LogAggregationFileController getLogAggregationFileController(
+ Configuration conf) {
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(conf);
+ LogAggregationFileController logAggregationFileController = factory
+ .getFileControllerForWrite();
+ return logAggregationFileController;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index b4bd9d7..bedad33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -241,8 +241,8 @@ public class TestAppLogAggregatorImpl {
// verify uploaded files
ArgumentCaptor<LogValue> logValCaptor =
ArgumentCaptor.forClass(LogValue.class);
- verify(appLogAggregator.logWriter).append(any(LogKey.class),
- logValCaptor.capture());
+ verify(appLogAggregator.getLogAggregationFileController()).write(
+ any(LogKey.class), logValCaptor.capture());
Set<String> filesUploaded = new HashSet<>();
LogValue logValue = logValCaptor.getValue();
for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
@@ -287,11 +287,13 @@ public class TestAppLogAggregatorImpl {
final Context context = createContext(config);
final FileContext fakeLfs = mock(FileContext.class);
final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
-
+ LogAggregationTFileController format = spy(
+ new LogAggregationTFileController());
+ format.initialize(config, "TFile");
return new AppLogAggregatorInTest(dispatcher, deletionService,
config, applicationId, ugi, nodeId, dirsService,
remoteLogDirForApp, appAcls, logAggregationContext,
- context, fakeLfs, recoveredLogInitedTimeMillis);
+ context, fakeLfs, recoveredLogInitedTimeMillis, format);
}
/**
@@ -402,7 +404,6 @@ public class TestAppLogAggregatorImpl {
final DeletionService deletionService;
final ApplicationId applicationId;
- final LogWriter logWriter;
final ArgumentCaptor<LogValue> logValue;
public AppLogAggregatorInTest(Dispatcher dispatcher,
@@ -411,19 +412,15 @@ public class TestAppLogAggregatorImpl {
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
- FileContext lfs, long recoveredLogInitedTime) throws IOException {
+ FileContext lfs, long recoveredLogInitedTime,
+ LogAggregationTFileController format) throws IOException {
super(dispatcher, deletionService, conf, appId, ugi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
- logAggregationContext, context, lfs, -1, recoveredLogInitedTime);
+ logAggregationContext, context, lfs, -1, recoveredLogInitedTime,
+ format);
this.applicationId = appId;
this.deletionService = deletionService;
- this.logWriter = spy(new LogWriter());
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
-
- @Override
- protected LogWriter createLogWriter() {
- return this.logWriter;
- }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 6383e83..0fa012c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -103,6 +103,9 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -161,11 +164,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
DrainDispatcher dispatcher;
EventHandler<Event> appEventHandler;
+ private NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
+
@Override
@SuppressWarnings("unchecked")
public void setup() throws IOException {
super.setup();
- NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
((NMContext)context).setNodeId(nodeId);
dispatcher = createDispatcher();
appEventHandler = mock(EventHandler.class);
@@ -246,9 +250,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
app1LogDir.exists());
- Path logFilePath =
- logAggregationService.getRemoteNodeLogFileForApp(application1,
- this.user);
+ Path logFilePath = logAggregationService
+ .getLogAggregationFileController(conf)
+ .getRemoteNodeLogFileForApp(application1, this.user, nodeId);
Assert.assertTrue("Log file [" + logFilePath + "] not found", new File(
logFilePath.toUri().getPath()).exists());
@@ -369,9 +373,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
-
- Assert.assertFalse(new File(logAggregationService
- .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
+ LogAggregationFileController format1 =
+ logAggregationService.getLogAggregationFileController(conf);
+ Assert.assertFalse(new File(format1.getRemoteNodeLogFileForApp(
+ application1, this.user, this.nodeId).toUri().getPath())
.exists());
dispatcher.await();
@@ -541,26 +546,33 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
};
checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
}
-
+
@Test
public void testVerifyAndCreateRemoteDirsFailure()
throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
-
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(conf);
+ LogAggregationFileController logAggregationFileFormat = factory
+ .getFileControllerForWrite();
+ LogAggregationFileController spyLogAggregationFileFormat =
+ spy(logAggregationFileFormat);
+ YarnRuntimeException e = new YarnRuntimeException("KABOOM!");
+ doThrow(e).doNothing().when(spyLogAggregationFileFormat)
+ .verifyAndCreateRemoteLogDir();
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
- super.dirsHandler));
+ super.dirsHandler) {
+ @Override
+ public LogAggregationFileController getLogAggregationFileController(
+ Configuration conf) {
+ return spyLogAggregationFileFormat;
+ }
+ });
logAggregationService.init(this.conf);
-
- YarnRuntimeException e = new YarnRuntimeException("KABOOM!");
- doThrow(e)
- .when(logAggregationService).verifyAndCreateRemoteLogDir(
- any(Configuration.class));
-
logAggregationService.start();
-
// Now try to start an application
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
@@ -607,8 +619,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop();
}
-
-
+
@Test
public void testVerifyAndCreateRemoteDirNonExistence()
throws Exception {
@@ -621,14 +632,24 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
logAggregationService.init(this.conf);
+ logAggregationService.start();
boolean existsBefore = aNewFile.exists();
assertTrue("The new file already exists!", !existsBefore);
- logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
-
+ ApplicationId appId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ LogAggregationContext contextWithAMAndFailed =
+ Records.newRecord(LogAggregationContext.class);
+ contextWithAMAndFailed.setLogAggregationPolicyClassName(
+ AMOrFailedContainerLogAggregationPolicy.class.getName());
+ logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+ this.user, null, this.acls, contextWithAMAndFailed));
+ dispatcher.await();
+
boolean existsAfter = aNewFile.exists();
assertTrue("The new aggregate file is not successfully created", existsAfter);
aNewFile.delete(); //housekeeping
+ logAggregationService.stop();
}
@Test
@@ -641,7 +662,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
LogAggregationService logAggregationService = new LogAggregationService(
dispatcher, this.context, this.delSrvc, super.dirsHandler);
logAggregationService.init(this.conf);
- logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
+ logAggregationService.start();
+
+ ApplicationId appId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ LogAggregationContext contextWithAMAndFailed =
+ Records.newRecord(LogAggregationContext.class);
+ contextWithAMAndFailed.setLogAggregationPolicyClassName(
+ AMOrFailedContainerLogAggregationPolicy.class.getName());
+ logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+ this.user, null, this.acls, contextWithAMAndFailed));
+ dispatcher.await();
String targetGroup =
UserGroupInformation.getLoginUser().getPrimaryGroupName();
@@ -651,6 +682,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
fileStatus.getGroup(), targetGroup);
fs.delete(aNewFile, true);
+ logAggregationService.stop();
}
@Test
@@ -669,14 +701,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
FileSystem fs = FileSystem.get(this.conf);
final FileSystem spyFs = spy(FileSystem.get(this.conf));
+ final LogAggregationTFileController spyFileFormat
+ = new LogAggregationTFileController() {
+ @Override
+ public FileSystem getFileSystem(Configuration conf)
+ throws IOException {
+ return spyFs;
+ }
+ };
+ spyFileFormat.initialize(conf, "TFile");
LogAggregationService aggSvc = new LogAggregationService(dispatcher,
this.context, this.delSrvc, super.dirsHandler) {
@Override
- protected FileSystem getFileSystem(Configuration conf) {
- return spyFs;
+ public LogAggregationFileController getLogAggregationFileController(
+ Configuration conf) {
+ return spyFileFormat;
}
};
-
aggSvc.init(this.conf);
aggSvc.start();
@@ -769,18 +810,36 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
@Test
public void testLogAggregationCreateDirsFailsWithoutKillingNM()
throws Exception {
-
- this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+ localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DeletionService spyDelSrvc = spy(this.delSrvc);
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(conf);
+ LogAggregationFileController logAggregationFileFormat = factory
+ .getFileControllerForWrite();
+ LogAggregationFileController spyLogAggregationFileFormat =
+ spy(logAggregationFileFormat);
+ Exception e = new RuntimeException("KABOOM!");
+ doThrow(e).when(spyLogAggregationFileFormat)
+ .createAppDir(any(String.class), any(ApplicationId.class),
+ any(UserGroupInformation.class));
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, spyDelSrvc,
- super.dirsHandler));
+ super.dirsHandler){
+ @Override
+ public LogAggregationFileController getLogAggregationFileController(
+ Configuration conf) {
+ return spyLogAggregationFileFormat;
+ }
+ });
+
logAggregationService.init(this.conf);
logAggregationService.start();
-
+
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
(int) (Math.random() * 1000));
@@ -789,10 +848,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new File(localLogDir, appId.toString());
appLogDir.mkdir();
- Exception e = new RuntimeException("KABOOM!");
- doThrow(e)
- .when(logAggregationService).createAppDir(any(String.class),
- any(ApplicationId.class), any(UserGroupInformation.class));
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(
@@ -867,7 +922,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
int minNumOfContainers, int maxNumOfContainers,
String[] logFiles, int numOfLogsPerContainer, boolean multiLogs)
throws IOException {
- Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
+ Path appLogDir = logAggregationService.getLogAggregationFileController(
+ conf).getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null;
try {
Path qualifiedLogDir =
@@ -2108,7 +2164,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null, this.acls, logAggContext));
-
+ dispatcher.await();
return logAggregationService;
}
@@ -2462,17 +2518,20 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop();
assertEquals(expectedLogAggregationTimes,
- aggregator.getLogAggregationTimes());
+ aggregator.getLogAggregationFileControllerContext()
+ .getLogAggregationTimes());
assertEquals(expectedAggregationReportNum,
this.context.getLogAggregationStatusForApps().size());
assertEquals(expectedCleanupOldLogsTimes,
- aggregator.getCleanupOldLogTimes());
+ aggregator.getLogAggregationFileControllerContext()
+ .getCleanOldLogsTimes());
}
private int numOfLogsAvailable(LogAggregationService logAggregationService,
ApplicationId appId, boolean sizeLimited, String lastLogFile)
throws IOException {
- Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
+ Path appLogDir = logAggregationService.getLogAggregationFileController(
+ conf).getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null;
try {
Path qualifiedLogDir =
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-6876. Create an abstract log writer for
extendability. Contributed by Xuan Gong.
Posted by ju...@apache.org.
YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c2cb7ea1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c2cb7ea1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c2cb7ea1
Branch: refs/heads/trunk
Commit: c2cb7ea1ef6532020b69031dbd18b0f9b8369f0f
Parents: 8196a07
Author: Junping Du <ju...@apache.org>
Authored: Thu Aug 24 13:36:49 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Aug 24 13:36:49 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 12 +-
.../yarn/conf/TestYarnConfigurationFields.java | 2 +
.../hadoop/yarn/client/cli/TestLogsCLI.java | 65 +--
.../logaggregation/AggregatedLogFormat.java | 22 +-
.../logaggregation/LogAggregationUtils.java | 41 ++
.../LogAggregationFileController.java | 404 +++++++++++++++++++
.../LogAggregationFileControllerContext.java | 130 ++++++
.../LogAggregationFileControllerFactory.java | 195 +++++++++
.../LogAggregationTFileController.java | 127 ++++++
.../filecontroller/package-info.java | 21 +
.../src/main/resources/yarn-default.xml | 19 +
.../logaggregation/TestAggregatedLogsBlock.java | 28 +-
.../logaggregation/TestContainerLogsUtils.java | 29 +-
...TestLogAggregationFileControllerFactory.java | 171 ++++++++
.../logaggregation/AppLogAggregatorImpl.java | 232 ++++-------
.../logaggregation/LogAggregationService.java | 210 +---------
.../TestAppLogAggregatorImpl.java | 25 +-
.../TestLogAggregationService.java | 135 +++++--
18 files changed, 1419 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 86f45b8..16bd73a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1064,7 +1064,17 @@ public class YarnConfiguration extends Configuration {
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
-
+
+ public static final String LOG_AGGREGATION_FILE_FORMATS = YARN_PREFIX
+ + "log-aggregation.file-formats";
+ public static final String LOG_AGGREGATION_FILE_CONTROLLER_FMT =
+ YARN_PREFIX + "log-aggregation.file-controller.%s.class";
+
+ public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT
+ = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir";
+ public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT
+ = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir-suffix";
+
/**
* How long to wait before deleting aggregated logs, -1 disables.
* Be careful set this too small and you will spam the name node.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index c40c2c5..153a35a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -184,6 +184,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
// Currently defined in RegistryConstants/core-site.xml
xmlPrefixToSkipCompare.add("hadoop.registry");
+ xmlPrefixToSkipCompare.add(
+ "yarn.log-aggregation.file-controller.TFile.class");
// Add the filters used for checking for collision of default values.
initDefaultValueCollisionCheck();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index c054209..26e0319 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -36,7 +36,6 @@ import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
@@ -78,6 +77,9 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
@@ -1345,42 +1347,55 @@ public class TestLogsCLI {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
+ System.currentTimeMillis());
- try (AggregatedLogFormat.LogWriter writer =
- new AggregatedLogFormat.LogWriter()) {
- writer.initialize(configuration, path, ugi);
- writer.writeApplicationOwner(ugi.getUserName());
-
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(configuration);
+ LogAggregationFileController fileFormat = factory
+ .getFileControllerForWrite();
+ try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
- writer.writeApplicationACLs(appAcls);
- writer.append(new AggregatedLogFormat.LogKey(containerId),
+ LogAggregationFileControllerContext context
+ = new LogAggregationFileControllerContext(
+ path, path, true, 1000,
+ containerId.getApplicationAttemptId().getApplicationId(),
+ appAcls, nodeId, ugi);
+ fileFormat.initializeWriter(context);
+ fileFormat.write(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
+ } finally {
+ fileFormat.closeWriter();
}
}
+ @SuppressWarnings("static-access")
private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
- Path path =
- new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
- + System.currentTimeMillis());
- try (AggregatedLogFormat.LogWriter writer =
- new AggregatedLogFormat.LogWriter()) {
- writer.initialize(configuration, path, ugi);
- writer.writeApplicationOwner(ugi.getUserName());
-
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(configuration);
+ LogAggregationFileController fileFormat = factory
+ .getFileControllerForWrite();
+ try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
- writer.writeApplicationACLs(appAcls);
- DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
- new AggregatedLogFormat.LogKey(containerId).write(out);
- out.close();
- out = writer.getWriter().prepareAppendValue(-1);
- new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
- UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
- new HashSet<>());
- out.close();
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ Path path = fileFormat.getRemoteNodeLogFileForApp(
+ appId, ugi.getCurrentUser().getShortUserName(), nodeId);
+ LogAggregationFileControllerContext context
+ = new LogAggregationFileControllerContext(
+ path, path, true, 1000,
+ appId, appAcls, nodeId, ugi);
+ fileFormat.initializeWriter(context);
+ AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(
+ containerId);
+ AggregatedLogFormat.LogValue value = new AggregatedLogFormat.LogValue(
+ rootLogDirs, containerId, UserGroupInformation.getCurrentUser()
+ .getShortUserName());
+ fileFormat.write(key, value);
+ } finally {
+ fileFormat.closeWriter();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index d806b12..3c1dcdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -44,7 +44,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;
-
+import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.WriterOutputStream;
import org.apache.commons.logging.Log;
@@ -61,7 +61,6 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
@@ -71,7 +70,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting;
@@ -249,7 +247,7 @@ public class AggregatedLogFormat {
in = secureOpenFile(logFile);
} catch (IOException e) {
logErrorMessage(logFile, e);
- IOUtils.cleanup(LOG, in);
+ IOUtils.closeQuietly(in);
continue;
}
@@ -287,7 +285,7 @@ public class AggregatedLogFormat {
String message = logErrorMessage(logFile, e);
out.write(message.getBytes(Charset.forName("UTF-8")));
} finally {
- IOUtils.cleanup(LOG, in);
+ IOUtils.closeQuietly(in);
}
}
}
@@ -557,7 +555,7 @@ public class AggregatedLogFormat {
} catch (Exception e) {
LOG.warn("Exception closing writer", e);
} finally {
- IOUtils.closeStream(this.fsDataOStream);
+ IOUtils.closeQuietly(this.fsDataOStream);
}
}
}
@@ -605,7 +603,7 @@ public class AggregatedLogFormat {
}
return null;
} finally {
- IOUtils.cleanup(LOG, ownerScanner);
+ IOUtils.closeQuietly(ownerScanner);
}
}
@@ -651,7 +649,7 @@ public class AggregatedLogFormat {
}
return acls;
} finally {
- IOUtils.cleanup(LOG, aclScanner);
+ IOUtils.closeQuietly(aclScanner);
}
}
@@ -775,8 +773,8 @@ public class AggregatedLogFormat {
}
}
} finally {
- IOUtils.cleanup(LOG, ps);
- IOUtils.cleanup(LOG, os);
+ IOUtils.closeQuietly(ps);
+ IOUtils.closeQuietly(os);
}
}
@@ -1001,7 +999,9 @@ public class AggregatedLogFormat {
}
public void close() {
- IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
+ IOUtils.closeQuietly(scanner);
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(fsDataIStream);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
index 24baaab..e8a28de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java
@@ -133,6 +133,23 @@ public class LogAggregationUtils {
new org.apache.hadoop.fs.Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ return getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix);
+ }
+
+ /**
+ * Return the remote application log directory.
+ * @param conf the configuration
+ * @param appId the application
+ * @param appOwner the application owner
+ * @param remoteRootLogDir the remote root log directory
+ * @param suffix the log directory suffix
+ * @return the remote application log directory path
+ * @throws IOException if we can not find remote application log directory
+ */
+ public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
+ Configuration conf, ApplicationId appId, String appOwner,
+ org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+ throws IOException {
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
@@ -159,6 +176,30 @@ public class LogAggregationUtils {
* @param conf the configuration
* @param appId the applicationId
* @param appOwner the application owner
+ * @param remoteRootLogDir the remote root log directory
+ * @param suffix the log directory suffix
+ * @return the iterator of available log files
+ * @throws IOException if there is no log file available
+ */
+ public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
+ Configuration conf, ApplicationId appId, String appOwner,
+ org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
+ throws IOException {
+ Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
+ remoteRootLogDir, suffix);
+ RemoteIterator<FileStatus> nodeFiles = null;
+ Path qualifiedLogDir =
+ FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
+ nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
+ conf).listStatus(remoteAppLogDir);
+ return nodeFiles;
+ }
+
+ /**
+ * Get all available log files under remote app log directory.
+ * @param conf the configuration
+ * @param appId the applicationId
+ * @param appOwner the application owner
* @return the iterator of available log files
* @throws IOException if there is no log file available
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
new file mode 100644
index 0000000..5503f8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+
+/**
+ * Base class to implement Log Aggregation File Controller.
+ */
+@Private
+@Unstable
+public abstract class LogAggregationFileController {
+
+ private static final Log LOG = LogFactory.getLog(
+ LogAggregationFileController.class);
+
+ /*
+ * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
+ * Group to which NMOwner belongs> App dirs will be created as 770,
+ * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
+ * access / modify the files.
+ * <NMGroup> should obviously be a limited access group.
+ */
+ /**
+ * Permissions for the top level directory under which app directories will be
+ * created.
+ */
+ protected static final FsPermission TLDIR_PERMISSIONS = FsPermission
+ .createImmutable((short) 01777);
+
+ /**
+ * Permissions for the Application directory.
+ */
+ protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
+ .createImmutable((short) 0770);
+
+ // This is temporary solution. The configuration will be deleted once
+ // we find a more scalable method to only write a single log file per LRS.
+ private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
+ = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
+ private static final int
+ DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
+
+ protected Configuration conf;
+ protected Path remoteRootLogDir;
+ protected String remoteRootLogDirSuffix;
+ protected int retentionSize;
+ protected String fileControllerName;
+
+ public LogAggregationFileController() {}
+
+ /**
+ * Initialize the log file controller.
+ * @param conf the Configuration
+ * @param controllerName the log controller class name
+ */
+ public void initialize(Configuration conf, String controllerName) {
+ this.conf = conf;
+ int configuredRentionSize =
+ conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
+ DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
+ if (configuredRentionSize <= 0) {
+ this.retentionSize =
+ DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
+ } else {
+ this.retentionSize = configuredRentionSize;
+ }
+ this.fileControllerName = controllerName;
+ initInternal(conf);
+ }
+
+ /**
+ * Derived classes initialize themselves using this method.
+ * @param conf the Configuration
+ */
+ protected abstract void initInternal(Configuration conf);
+
+ /**
+ * Get the remote root log directory.
+ * @return the remote root log directory path
+ */
+ public Path getRemoteRootLogDir() {
+ return this.remoteRootLogDir;
+ }
+
+ /**
+ * Get the log aggregation directory suffix.
+ * @return the log aggregation directory suffix
+ */
+ public String getRemoteRootLogDirSuffix() {
+ return this.remoteRootLogDirSuffix;
+ }
+
+ /**
+ * Initialize the writer.
+ * @param context the {@link LogAggregationFileControllerContext}
+ * @throws IOException if fails to initialize the writer
+ */
+ public abstract void initializeWriter(
+ LogAggregationFileControllerContext context) throws IOException;
+
+ /**
+ * Close the writer.
+ */
+ public abstract void closeWriter();
+
+ /**
+ * Write the log content.
+ * @param logKey the log key
+ * @param logValue the log content
+ * @throws IOException if fails to write the logs
+ */
+ public abstract void write(LogKey logKey, LogValue logValue)
+ throws IOException;
+
+ /**
+ * Operations needed after write the log content.
+ * @param record the {@link LogAggregationFileControllerContext}
+ * @throws Exception if anything fails
+ */
+ public abstract void postWrite(LogAggregationFileControllerContext record)
+ throws Exception;
+
+ /**
+ * Verify and create the remote log directory.
+ */
+ public void verifyAndCreateRemoteLogDir() {
+ boolean logPermError = true;
+ // Checking the existence of the TLD
+ FileSystem remoteFS = null;
+ try {
+ remoteFS = getFileSystem(conf);
+ } catch (IOException e) {
+ throw new YarnRuntimeException(
+ "Unable to get Remote FileSystem instance", e);
+ }
+ boolean remoteExists = true;
+ Path remoteRootLogDir = getRemoteRootLogDir();
+ try {
+ FsPermission perms =
+ remoteFS.getFileStatus(remoteRootLogDir).getPermission();
+ if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
+ LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ + "] already exist, but with incorrect permissions. "
+ + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ + "]." + " The cluster may have problems with multiple users.");
+ logPermError = false;
+ } else {
+ logPermError = true;
+ }
+ } catch (FileNotFoundException e) {
+ remoteExists = false;
+ } catch (IOException e) {
+ throw new YarnRuntimeException(
+ "Failed to check permissions for dir ["
+ + remoteRootLogDir + "]", e);
+ }
+ if (!remoteExists) {
+ LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ + "] does not exist. Attempting to create it.");
+ try {
+ Path qualified =
+ remoteRootLogDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+ remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
+ remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
+
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ String primaryGroupName = null;
+ try {
+ primaryGroupName = loginUser.getPrimaryGroupName();
+ } catch (IOException e) {
+ LOG.warn("No primary group found. The remote root log directory" +
+ " will be created with the HDFS superuser being its group " +
+ "owner. JobHistoryServer may be unable to read the directory.");
+ }
+ // set owner on the remote directory only if the primary group exists
+ if (primaryGroupName != null) {
+ remoteFS.setOwner(qualified,
+ loginUser.getShortUserName(), primaryGroupName);
+ }
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Failed to create remoteLogDir ["
+ + remoteRootLogDir + "]", e);
+ }
+ }
+ }
+
+ /**
+ * Create remote Application directory for log aggregation.
+ * @param user the user
+ * @param appId the application ID
+ * @param userUgi the UGI
+ */
+ public void createAppDir(final String user, final ApplicationId appId,
+ UserGroupInformation userUgi) {
+ final Path remoteRootLogDir = getRemoteRootLogDir();
+ final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix();
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ try {
+ // TODO: Reuse FS for user?
+ FileSystem remoteFS = getFileSystem(conf);
+
+ // Only creating directories if they are missing to avoid
+ // unnecessary load on the filesystem from all of the nodes
+ Path appDir = LogAggregationUtils.getRemoteAppLogDir(
+ remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
+
+ appDir = appDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
+ Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
+ remoteRootLogDir, user, remoteRootLogDirSuffix);
+ suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
+ Path userDir = LogAggregationUtils.getRemoteLogUserDir(
+ remoteRootLogDir, user);
+ userDir = userDir.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory());
+
+ if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
+ createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
+ }
+
+ createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
+ }
+
+ } catch (IOException e) {
+ LOG.error("Failed to setup application log directory for "
+ + appId, e);
+ throw e;
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ protected FileSystem getFileSystem(Configuration conf) throws IOException {
+ return getRemoteRootLogDir().getFileSystem(conf);
+ }
+
+ protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
+ throws IOException {
+ FsPermission dirPerm = new FsPermission(fsPerm);
+ fs.mkdirs(path, dirPerm);
+ FsPermission umask = FsPermission.getUMask(fs.getConf());
+ if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
+ fs.setPermission(path, new FsPermission(fsPerm));
+ }
+ }
+
+ protected boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
+ throws IOException {
+ boolean exists = true;
+ try {
+ FileStatus appDirStatus = fs.getFileStatus(path);
+ if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
+ fs.setPermission(path, APP_DIR_PERMISSIONS);
+ }
+ } catch (FileNotFoundException fnfe) {
+ exists = false;
+ }
+ return exists;
+ }
+
+ /**
+ * Get the remote aggregated log path.
+ * @param appId the ApplicationId
+ * @param user the Application Owner
+ * @param nodeId the NodeManager Id
+ * @return the remote aggregated log path
+ */
+ public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
+ NodeId nodeId) {
+ return LogAggregationUtils.getRemoteNodeLogFileForApp(
+ getRemoteRootLogDir(), appId, user, nodeId,
+ getRemoteRootLogDirSuffix());
+ }
+
+ /**
+ * Get the remote application directory for log aggregation.
+ * @param appId the Application ID
+ * @param appOwner the Application Owner
+ * @return the remote application directory
+ * @throws IOException if can not find the remote application directory
+ */
+ public Path getRemoteAppLogDir(ApplicationId appId, String appOwner)
+ throws IOException {
+ return LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner,
+ this.remoteRootLogDir, this.remoteRootLogDirSuffix);
+ }
+
+ protected void cleanOldLogs(Path remoteNodeLogFileForApp,
+ final NodeId nodeId, UserGroupInformation userUgi) {
+ try {
+ final FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
+ Path appDir = remoteNodeLogFileForApp.getParent().makeQualified(
+ remoteFS.getUri(), remoteFS.getWorkingDirectory());
+ Set<FileStatus> status =
+ new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
+
+ Iterable<FileStatus> mask =
+ Iterables.filter(status, new Predicate<FileStatus>() {
+ @Override
+ public boolean apply(FileStatus next) {
+ return next.getPath().getName()
+ .contains(LogAggregationUtils.getNodeString(nodeId))
+ && !next.getPath().getName().endsWith(
+ LogAggregationUtils.TMP_FILE_SUFFIX);
+ }
+ });
+ status = Sets.newHashSet(mask);
+ // Normally, we just need to delete one oldest log
+ // before we upload a new log.
+ // If we can not delete the older logs in this cycle,
+ // we will delete them in next cycle.
+ if (status.size() >= this.retentionSize) {
+ // sort by the lastModificationTime ascending
+ List<FileStatus> statusList = new ArrayList<FileStatus>(status);
+ Collections.sort(statusList, new Comparator<FileStatus>() {
+ public int compare(FileStatus s1, FileStatus s2) {
+ return s1.getModificationTime() < s2.getModificationTime() ? -1
+ : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
+ }
+ });
+ for (int i = 0; i <= statusList.size() - this.retentionSize; i++) {
+ final FileStatus remove = statusList.get(i);
+ try {
+ userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ remoteFS.delete(remove.getPath(), false);
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Failed to delete " + remove.getPath(), e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to clean old logs", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
new file mode 100644
index 0000000..32128bc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerContext.java
@@ -0,0 +1,130 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * {@code LogAggregationFileControllerContext} is a record used in
+ * the log aggregation process.
+ */
+@Private
+@Unstable
+public class LogAggregationFileControllerContext {
+ private final boolean logAggregationInRolling;
+ private final long rollingMonitorInterval;
+ private final Path remoteNodeLogFileForApp;
+ private final NodeId nodeId;
+ private final UserGroupInformation userUgi;
+ private final ApplicationId appId;
+ private final Path remoteNodeTmpLogFileForApp;
+ private final Map<ApplicationAccessType, String> appAcls;
+ private int logAggregationTimes = 0;
+ private int cleanOldLogsTimes = 0;
+
+ private boolean uploadedLogsInThisCycle;
+ private long logUploadedTimeStamp;
+
+ public LogAggregationFileControllerContext(Path remoteNodeLogFileForApp,
+ Path remoteNodeTmpLogFileForApp,
+ boolean logAggregationInRolling,
+ long rollingMonitorInterval,
+ ApplicationId appId,
+ Map<ApplicationAccessType, String> appAcls,
+ NodeId nodeId, UserGroupInformation userUgi) {
+ this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+ this.remoteNodeTmpLogFileForApp = remoteNodeTmpLogFileForApp;
+ this.logAggregationInRolling = logAggregationInRolling;
+ this.rollingMonitorInterval = rollingMonitorInterval;
+ this.nodeId = nodeId;
+ this.appId = appId;
+ this.appAcls = appAcls;
+ this.userUgi = userUgi;
+ }
+
+ public boolean isUploadedLogsInThisCycle() {
+ return uploadedLogsInThisCycle;
+ }
+
+ public void setUploadedLogsInThisCycle(boolean uploadedLogsInThisCycle) {
+ this.uploadedLogsInThisCycle = uploadedLogsInThisCycle;
+ }
+
+ public Path getRemoteNodeLogFileForApp() {
+ return remoteNodeLogFileForApp;
+ }
+
+ public long getRollingMonitorInterval() {
+ return rollingMonitorInterval;
+ }
+
+ public boolean isLogAggregationInRolling() {
+ return logAggregationInRolling;
+ }
+
+ public long getLogUploadTimeStamp() {
+ return logUploadedTimeStamp;
+ }
+
+ public void setLogUploadTimeStamp(long uploadTimeStamp) {
+ this.logUploadedTimeStamp = uploadTimeStamp;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public UserGroupInformation getUserUgi() {
+ return userUgi;
+ }
+
+ public ApplicationId getAppId() {
+ return appId;
+ }
+
+ public Path getRemoteNodeTmpLogFileForApp() {
+ return remoteNodeTmpLogFileForApp;
+ }
+
+ public void increLogAggregationTimes() {
+ this.logAggregationTimes++;
+ }
+
+ public void increcleanupOldLogTimes() {
+ this.cleanOldLogsTimes++;
+ }
+
+ public int getLogAggregationTimes() {
+ return logAggregationTimes;
+ }
+
+ public int getCleanOldLogsTimes() {
+ return cleanOldLogsTimes;
+ }
+
+ public Map<ApplicationAccessType, String> getAppAcls() {
+ return appAcls;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
new file mode 100644
index 0000000..746bf5a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Use {@code LogAggregationFileControllerFactory} to get the correct
+ * {@link LogAggregationFileController} for write and read.
+ *
+ */
+@Private
+@Unstable
+public class LogAggregationFileControllerFactory {
+
+ private static final Log LOG = LogFactory.getLog(
+ LogAggregationFileControllerFactory.class);
+ private final Pattern p = Pattern.compile(
+ "^[A-Za-z_]+[A-Za-z0-9_]*$");
+ private LinkedList<LogAggregationFileController> controllers
+ = new LinkedList<>();
+ private Configuration conf;
+
+ /**
+ * Construct the LogAggregationFileControllerFactory object.
+ * @param conf the Configuration
+ */
+ public LogAggregationFileControllerFactory(Configuration conf) {
+ this.conf = conf;
+ Collection<String> fileControllers = conf.getStringCollection(
+ YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
+ List<String> controllerClassName = new ArrayList<>();
+
+ Map<String, String> controllerChecker = new HashMap<>();
+
+ for (String fileController : fileControllers) {
+ Preconditions.checkArgument(validateAggregatedFileControllerName(
+ fileController), "The FileControllerName: " + fileController
+ + " set in " + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS
+ +" is invalid." + "The valid File Controller name should only "
+ + "contain a-zA-Z0-9_ and can not start with numbers");
+
+ String remoteDirStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+ fileController);
+ String remoteDir = conf.get(remoteDirStr);
+ boolean defaultRemoteDir = false;
+ if (remoteDir == null || remoteDir.isEmpty()) {
+ remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
+ defaultRemoteDir = true;
+ }
+ String suffixStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+ fileController);
+ String suffix = conf.get(suffixStr);
+ boolean defaultSuffix = false;
+ if (suffix == null || suffix.isEmpty()) {
+ suffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ defaultSuffix = true;
+ }
+ String dirSuffix = remoteDir + "-" + suffix;
+ if (controllerChecker.containsKey(dirSuffix)) {
+ if (defaultRemoteDir && defaultSuffix) {
+ String fileControllerStr = controllerChecker.get(dirSuffix);
+ List<String> controllersList = new ArrayList<>();
+ controllersList.add(fileControllerStr);
+ controllersList.add(fileController);
+ fileControllerStr = StringUtils.join(controllersList, ",");
+ controllerChecker.put(dirSuffix, fileControllerStr);
+ } else {
+ String conflictController = controllerChecker.get(dirSuffix);
+ throw new RuntimeException("The combined value of " + remoteDirStr
+ + " and " + suffixStr + " should not be the same as the value"
+ + " set for " + conflictController);
+ }
+ } else {
+ controllerChecker.put(dirSuffix, fileController);
+ }
+ String classKey = String.format(
+ YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT,
+ fileController);
+ String className = conf.get(classKey);
+ if (className == null || className.isEmpty()) {
+ throw new RuntimeException("No class configured for "
+ + fileController);
+ }
+ controllerClassName.add(className);
+ Class<? extends LogAggregationFileController> sClass = conf.getClass(
+ classKey, null, LogAggregationFileController.class);
+ if (sClass == null) {
+ throw new RuntimeException("No class defined for " + fileController);
+ }
+ LogAggregationFileController s = ReflectionUtils.newInstance(
+ sClass, conf);
+ if (s == null) {
+ throw new RuntimeException("No object created for "
+ + controllerClassName);
+ }
+ s.initialize(conf, fileController);
+ controllers.add(s);
+ }
+ }
+
+ /**
+ * Get {@link LogAggregationFileController} to write.
+ * @return the LogAggregationFileController instance
+ */
+ public LogAggregationFileController getFileControllerForWrite() {
+ return controllers.getFirst();
+ }
+
+ /**
+ * Get {@link LogAggregationFileController} to read the aggregated logs
+ * for this application.
+ * @param appId the ApplicationId
+ * @param appOwner the Application Owner
+ * @return the LogAggregationFileController instance
+ * @throws IOException if can not find any log aggregation file controller
+ */
+ public LogAggregationFileController getFileControllerForRead(
+ ApplicationId appId, String appOwner) throws IOException {
+ StringBuilder diagnosis = new StringBuilder();
+ for(LogAggregationFileController fileController : controllers) {
+ try {
+ Path remoteAppLogDir = fileController.getRemoteAppLogDir(
+ appId, appOwner);
+ Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(
+ remoteAppLogDir);
+ RemoteIterator<FileStatus> nodeFiles = FileContext.getFileContext(
+ qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
+ if (nodeFiles.hasNext()) {
+ return fileController;
+ }
+ } catch (Exception ex) {
+ diagnosis.append(ex.getMessage() + "\n");
+ continue;
+ }
+ }
+ throw new IOException(diagnosis.toString());
+ }
+
+ private boolean validateAggregatedFileControllerName(String name) {
+ if (name == null || name.trim().isEmpty()) {
+ return false;
+ }
+ return p.matcher(name).matches();
+ }
+
+ @Private
+ @VisibleForTesting
+ public LinkedList<LogAggregationFileController>
+ getConfiguredLogAggregationFileControllerList() {
+ return this.controllers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
new file mode 100644
index 0000000..9e0c66d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.Times;
+
+/**
+ * The TFile log aggregation file Controller implementation.
+ */
+@Private
+@Unstable
+public class LogAggregationTFileController
+ extends LogAggregationFileController {
+
+ private static final Log LOG = LogFactory.getLog(
+ LogAggregationTFileController.class);
+
+ private LogWriter writer;
+
+ public LogAggregationTFileController(){}
+
+ @Override
+ public void initInternal(Configuration conf) {
+ this.remoteRootLogDir = new Path(
+ conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+ this.remoteRootLogDirSuffix =
+ conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ }
+
+ @Override
+ public void initializeWriter(LogAggregationFileControllerContext context)
+ throws IOException {
+ this.writer = new LogWriter();
+ writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
+ context.getUserUgi());
+ // Write ACLs once when the writer is created.
+ writer.writeApplicationACLs(context.getAppAcls());
+ writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
+ }
+
+ @Override
+ public void closeWriter() {
+ this.writer.close();
+ }
+
+ @Override
+ public void write(LogKey logKey, LogValue logValue) throws IOException {
+ this.writer.append(logKey, logValue);
+ }
+
+ @Override
+ public void postWrite(final LogAggregationFileControllerContext record)
+ throws Exception {
+ // Before upload logs, make sure the number of existing logs
+ // is smaller than the configured NM log aggregation retention size.
+ if (record.isUploadedLogsInThisCycle() &&
+ record.isLogAggregationInRolling()) {
+ cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
+ record.getUserUgi());
+ record.increcleanupOldLogTimes();
+ }
+
+ final Path renamedPath = record.getRollingMonitorInterval() <= 0
+ ? record.getRemoteNodeLogFileForApp() : new Path(
+ record.getRemoteNodeLogFileForApp().getParent(),
+ record.getRemoteNodeLogFileForApp().getName() + "_"
+ + record.getLogUploadTimeStamp());
+ final boolean rename = record.isUploadedLogsInThisCycle();
+ try {
+ record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
+ .getFileSystem(conf);
+ if (rename) {
+ remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
+ renamedPath);
+ } else {
+ remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to move temporary log file to final location: ["
+ + record.getRemoteNodeTmpLogFileForApp() + "] to ["
+ + renamedPath + "]", e);
+ throw new Exception("Log uploaded failed for Application: "
+ + record.getAppId() + " in NodeManager: "
+ + LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+ + Times.format(record.getLogUploadTimeStamp()) + "\n");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
new file mode 100644
index 0000000..cad238a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+import org.apache.hadoop.classification.InterfaceAudience;
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f93de44..0823dfe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1167,6 +1167,25 @@
</property>
<property>
+ <description>Specify which log file controllers we will support. The first
+ file controller we add will be used to write the aggregated logs.
+ This comma separated configuration will work with the configuration:
+ yarn.log-aggregation.file-controller.%s.class which defines the supported
+ file controller's class. By default, the TFile controller would be used.
+ The user could override this configuration by adding more file controllers.
+ To support back-ward compatibility, make sure that we always
+ add TFile file controller.</description>
+ <name>yarn.log-aggregation.file-formats</name>
+ <value>TFile</value>
+ </property>
+
+ <property>
+ <description>Class that supports TFile read and write operations.</description>
+ <name>yarn.log-aggregation.file-controller.TFile.class</name>
+ <value>org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController</value>
+ </property>
+
+ <property>
<description>
How long for ResourceManager to wait for NodeManager to report its
log aggregation status. If waiting time of which the log aggregation
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
index 1e71b3c..3dd7de3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
@@ -40,10 +40,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
import org.apache.hadoop.yarn.webapp.view.BlockForTest;
@@ -249,7 +253,7 @@ public class TestAggregatedLogsBlock {
private Configuration getConfiguration() {
- Configuration configuration = new Configuration();
+ Configuration configuration = new YarnConfiguration();
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/logs");
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
@@ -295,19 +299,25 @@ public class TestAggregatedLogsBlock {
List<String> rootLogDirs = Arrays.asList("target/logs/logs");
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- try (AggregatedLogFormat.LogWriter writer =
- new AggregatedLogFormat.LogWriter()) {
- writer.initialize(configuration, new Path(path), ugi);
- writer.writeApplicationOwner(ugi.getUserName());
-
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(configuration);
+ LogAggregationFileController fileController = factory
+ .getFileControllerForWrite();
+ try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
- writer.writeApplicationACLs(appAcls);
-
- writer.append(
+ NodeId nodeId = NodeId.newInstance("localhost", 1234);
+ LogAggregationFileControllerContext context
+ = new LogAggregationFileControllerContext(
+ new Path(path), new Path(path), false, 3600,
+ appId, appAcls, nodeId, ugi);
+ fileController.initializeWriter(context);
+ fileController.write(
new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
+ } finally {
+ fileController.closeWriter();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index 8b665e0..a12e2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -24,15 +24,21 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
/**
* This class contains several utility functions for log aggregation tests.
@@ -110,14 +116,25 @@ public final class TestContainerLogsUtils {
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
- try (AggregatedLogFormat.LogWriter writer =
- new AggregatedLogFormat.LogWriter()) {
- writer.initialize(configuration, path, ugi);
- writer.writeApplicationOwner(ugi.getUserName());
-
- writer.append(new AggregatedLogFormat.LogKey(containerId),
+ LogAggregationFileControllerFactory factory
+ = new LogAggregationFileControllerFactory(configuration);
+ LogAggregationFileController fileController = factory
+ .getFileControllerForWrite();
+ try {
+ Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+ appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ LogAggregationFileControllerContext context
+ = new LogAggregationFileControllerContext(
+ path, path, true, 1000,
+ appId, appAcls, nodeId, ugi);
+ fileController.initializeWriter(context);
+ fileController.write(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
+ } finally {
+ fileController.closeWriter();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2cb7ea1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
new file mode 100644
index 0000000..45f18c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.LinkedList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
+import org.junit.Test;
+
+/**
+ * Test LogAggregationFileControllerFactory.
+ *
+ */
+public class TestLogAggregationFileControllerFactory {
+
+ @Test(timeout = 10000)
+ public void testLogAggregationFileControllerFactory() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ String appOwner = "test";
+ String remoteLogRootDir = "target/app-logs/";
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
+ FileSystem fs = FileSystem.get(conf);
+
+ LogAggregationFileControllerFactory factory =
+ new LogAggregationFileControllerFactory(conf);
+ LinkedList<LogAggregationFileController> list = factory
+ .getConfiguredLogAggregationFileControllerList();
+ assertTrue(list.size() == 1);
+ assertTrue(list.getFirst() instanceof LogAggregationTFileController);
+ assertTrue(factory.getFileControllerForWrite()
+ instanceof LogAggregationTFileController);
+ Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
+ try {
+ if (fs.exists(logPath)) {
+ fs.delete(logPath, true);
+ }
+ assertTrue(fs.mkdirs(logPath));
+ Writer writer =
+ new FileWriter(new File(logPath.toString(), "testLog"));
+ writer.write("test");
+ writer.close();
+ assertTrue(factory.getFileControllerForRead(appId, appOwner)
+ instanceof LogAggregationTFileController);
+ } finally {
+ fs.delete(logPath, true);
+ }
+
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+ "TestLogAggregationFileController");
+ // Did not set class for TestLogAggregationFileController,
+ // should get the exception.
+ try {
+ factory =
+ new LogAggregationFileControllerFactory(conf);
+ fail();
+ } catch (Exception ex) {
+ // should get exception
+ }
+
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
+ "TestLogAggregationFileController,TFile");
+ conf.setClass(
+ "yarn.log-aggregation.file-controller.TestLogAggregationFileController"
+ + ".class", TestLogAggregationFileController.class,
+ LogAggregationFileController.class);
+
+ conf.set(
+ "yarn.log-aggregation.TestLogAggregationFileController"
+ + ".remote-app-log-dir", remoteLogRootDir);
+ conf.set(
+ "yarn.log-aggregation.TestLogAggregationFileController"
+ + ".remote-app-log-dir-suffix", "testLog");
+
+ factory = new LogAggregationFileControllerFactory(conf);
+ list = factory.getConfiguredLogAggregationFileControllerList();
+ assertTrue(list.size() == 2);
+ assertTrue(list.getFirst() instanceof TestLogAggregationFileController);
+ assertTrue(list.getLast() instanceof LogAggregationTFileController);
+ assertTrue(factory.getFileControllerForWrite()
+ instanceof TestLogAggregationFileController);
+
+ logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
+ try {
+ if (fs.exists(logPath)) {
+ fs.delete(logPath, true);
+ }
+ assertTrue(fs.mkdirs(logPath));
+ Writer writer =
+ new FileWriter(new File(logPath.toString(), "testLog"));
+ writer.write("test");
+ writer.close();
+ assertTrue(factory.getFileControllerForRead(appId, appOwner)
+ instanceof TestLogAggregationFileController);
+ } finally {
+ fs.delete(logPath, true);
+ }
+ }
+
+ private static class TestLogAggregationFileController
+ extends LogAggregationFileController {
+
+ @Override
+ public void initInternal(Configuration conf) {
+ String remoteDirStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+ this.fileControllerName);
+ this.remoteRootLogDir = new Path(conf.get(remoteDirStr));
+ String suffix = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+ this.fileControllerName);
+ this.remoteRootLogDirSuffix = conf.get(suffix);
+ }
+
+ @Override
+ public void closeWriter() {
+ // Do Nothing
+ }
+
+ @Override
+ public void write(LogKey logKey, LogValue logValue) throws IOException {
+ // Do Nothing
+ }
+
+ @Override
+ public void postWrite(LogAggregationFileControllerContext record)
+ throws Exception {
+ // Do Nothing
+ }
+
+ @Override
+ public void initializeWriter(LogAggregationFileControllerContext context)
+ throws IOException {
+ // Do Nothing
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org