You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/06/22 00:26:08 UTC
[10/51] [abbrv] hadoop git commit: YARN-5237. Fix missing log files
issue in rolling log aggregation. Contributed by Xuan Gong.
YARN-5237. Fix missing log files issue in rolling log aggregation. Contributed by Xuan Gong.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5dfc38ff
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5dfc38ff
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5dfc38ff
Branch: refs/heads/YARN-2915
Commit: 5dfc38ff57669cba9078146e91ed990a1d25a3f0
Parents: 6f0aa75
Author: Junping Du <ju...@apache.org>
Authored: Wed Jun 15 16:17:54 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Wed Jun 15 16:17:54 2016 -0700
----------------------------------------------------------------------
.../logaggregation/AggregatedLogFormat.java | 48 +++++----
.../logaggregation/AppLogAggregatorImpl.java | 8 +-
.../TestLogAggregationService.java | 100 ++++++++++++++++++-
3 files changed, 134 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dfc38ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 8b213d5..2fb18ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -168,6 +168,7 @@ public class AggregatedLogFormat {
private final Set<String> alreadyUploadedLogFiles;
private Set<String> allExistingFileMeta = new HashSet<String>();
private final boolean appFinished;
+ private final boolean containerFinished;
/**
* The retention context to determine if log files are older than
@@ -186,13 +187,14 @@ public class AggregatedLogFormat {
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) {
this(rootLogDirs, containerId, user, null, new HashSet<String>(),
- null, true);
+ null, true, true);
}
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext,
Set<String> alreadyUploadedLogFiles,
- LogRetentionContext retentionContext, boolean appFinished) {
+ LogRetentionContext retentionContext, boolean appFinished,
+ boolean containerFinished) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId;
this.user = user;
@@ -202,6 +204,7 @@ public class AggregatedLogFormat {
this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
this.appFinished = appFinished;
+ this.containerFinished = containerFinished;
this.logRetentionContext = retentionContext;
}
@@ -318,29 +321,40 @@ public class AggregatedLogFormat {
return candidates;
}
+ Set<File> fileCandidates = new HashSet<File>(candidates);
if (this.logAggregationContext != null && candidates.size() > 0) {
- filterFiles(
- this.appFinished ? this.logAggregationContext.getIncludePattern()
+ fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
+ if (!this.appFinished && this.containerFinished) {
+ Set<File> addition = new HashSet<File>(candidates);
+ addition = getFileCandidates(addition, true);
+ fileCandidates.addAll(addition);
+ }
+ }
+
+ return fileCandidates;
+ }
+
+ private Set<File> getFileCandidates(Set<File> candidates,
+ boolean useRegularPattern) {
+ filterFiles(
+ useRegularPattern ? this.logAggregationContext.getIncludePattern()
: this.logAggregationContext.getRolledLogsIncludePattern(),
candidates, false);
- filterFiles(
- this.appFinished ? this.logAggregationContext.getExcludePattern()
+ filterFiles(
+ useRegularPattern ? this.logAggregationContext.getExcludePattern()
: this.logAggregationContext.getRolledLogsExcludePattern(),
candidates, true);
- Iterable<File> mask =
- Iterables.filter(candidates, new Predicate<File>() {
- @Override
- public boolean apply(File next) {
- return !alreadyUploadedLogFiles
+ Iterable<File> mask =
+ Iterables.filter(candidates, new Predicate<File>() {
+ @Override
+ public boolean apply(File next) {
+ return !alreadyUploadedLogFiles
.contains(getLogFileMetaData(next));
- }
- });
- candidates = Sets.newHashSet(mask);
- }
-
- return candidates;
+ }
+ });
+ return Sets.newHashSet(mask);
}
private void filterFiles(String pattern, Set<File> candidates,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dfc38ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index c70fa5b..872b805 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -328,7 +328,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
containerLogAggregators.put(container, aggregator);
}
Set<Path> uploadedFilePathsInThisCycle =
- aggregator.doContainerLogAggregation(writer, appFinished);
+ aggregator.doContainerLogAggregation(writer, appFinished,
+ finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
this.delService.delete(this.userUgi.getShortUserName(), null,
@@ -643,7 +644,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
public Set<Path> doContainerLogAggregation(LogWriter writer,
- boolean appFinished) {
+ boolean appFinished, boolean containerFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
@@ -651,7 +652,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
final LogValue logValue =
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
userUgi.getShortUserName(), logAggregationContext,
- this.uploadedFileMeta, retentionContext, appFinished);
+ this.uploadedFileMeta, retentionContext, appFinished,
+ containerFinished);
try {
writer.append(logKey, logValue);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dfc38ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 92c6b80..c98e366 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -1575,6 +1575,102 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
"getApplicationID");
}
+ @SuppressWarnings("resource")
+ @Test (timeout = 50000)
+ public void testLogAggregationServiceWithPatternsAndIntervals()
+ throws Exception {
+ LogAggregationContext logAggregationContext =
+ Records.newRecord(LogAggregationContext.class);
+ // set IncludePattern and RolledLogsIncludePattern.
+ // When the app is running, we only aggregate the log with
+ // the name stdout. After the app finishes, we only aggregate
+ // the log with the name std_final.
+ logAggregationContext.setRolledLogsIncludePattern("stdout");
+ logAggregationContext.setIncludePattern("std_final");
+ this.conf.set(
+ YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+ //configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to
+ //have fully qualified path
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.toURI().toString());
+ this.conf.setLong(
+ YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
+ 3600);
+
+ this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+
+ ApplicationId application =
+ BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(application, 1);
+ ContainerId container = createContainer(appAttemptId, 1,
+ ContainerType.APPLICATION_MASTER);
+
+ ConcurrentMap<ApplicationId, Application> maps =
+ this.context.getApplications();
+ Application app = mock(Application.class);
+ maps.put(application, app);
+ when(app.getContainers()).thenReturn(this.context.getContainers());
+
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, context, this.delSrvc,
+ super.dirsHandler);
+
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ // AppLogDir should be created
+ File appLogDir =
+ new File(localLogDir, ConverterUtils.toString(application));
+ appLogDir.mkdir();
+ logAggregationService.handle(new LogHandlerAppStartedEvent(application,
+ this.user, null, this.acls, logAggregationContext));
+
+ // Simulate log-file creation
+ // create std_final in log directory which will not be aggregated
+ // until the app finishes.
+ String[] logFilesWithFinalLog =
+ new String[] {"stdout", "std_final"};
+ writeContainerLogs(appLogDir, container, logFilesWithFinalLog);
+
+ // Do log aggregation
+ AppLogAggregatorImpl aggregator =
+ (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
+ .get(application);
+
+ aggregator.doLogAggregationOutOfBand();
+
+ Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+ 50, 1, false, null));
+
+ String[] logFiles = new String[] { "stdout" };
+ verifyContainerLogs(logAggregationService, application,
+ new ContainerId[] {container}, logFiles, 1, true);
+
+ logAggregationService.handle(
+ new LogHandlerContainerFinishedEvent(container, 0));
+
+ dispatcher.await();
+
+ // Do the log aggregation after ContainerFinishedEvent but before
+ // AppFinishedEvent. The std_final is expected to be aggregated this time
+ // even if the app is running but the container finishes.
+ aggregator.doLogAggregationOutOfBand();
+
+ Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+ 50, 2, false, null));
+
+ // This container finishes.
+ // The log "std_final" should be aggregated this time.
+ String[] logFinalLog = new String[] {"std_final"};
+ verifyContainerLogs(logAggregationService, application,
+ new ContainerId[] {container}, logFinalLog, 1, true);
+
+ logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
+
+ logAggregationService.stop();
+ }
+
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testNoneContainerPolicy() throws Exception {
@@ -1583,14 +1679,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
LogAggregationService logAggregationService = createLogAggregationService(
appId, NoneContainerLogAggregationPolicy.class, null);
- String[] logFiles = new String[] { "stdout" };
+ String[] logFiles = new String[] {"stdout"};
ContainerId container1 = finishContainer(appId, logAggregationService,
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
- new ContainerId[] { container1 }, logFiles, 0, false);
+ new ContainerId[] {container1}, logFiles, 0, false);
verifyLogAggFinishEvent(appId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org