You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/13 23:00:03 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-846] Enhance LogCopier service to handle continuous YARN log …
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 778d05f [GOBBLIN-846] Enhance LogCopier service to handle continuous YARN log …
778d05f is described below
commit 778d05f965dde97db45fd4019a9985024a1fd02a
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Aug 13 15:59:52 2019 -0700
[GOBBLIN-846] Enhance LogCopier service to handle continuous YARN log …
Closes #2701 from sv2000/logCopier
---
.../gobblin/cluster/GobblinClusterManager.java | 2 +-
.../apache/gobblin/cluster/GobblinTaskRunner.java | 6 +-
.../org/apache/gobblin/util/logs/LogCopier.java | 269 ++++++++++-----------
.../apache/gobblin/util/logs/LogCopierTest.java | 113 +++++++++
.../gobblin/yarn/GobblinApplicationMaster.java | 5 +-
.../gobblin/yarn/GobblinYarnAppLauncher.java | 12 +-
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 20 +-
.../apache/gobblin/yarn/GobblinYarnLogSource.java | 48 ++--
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 23 +-
.../java/org/apache/gobblin/yarn/YarnService.java | 4 +
10 files changed, 316 insertions(+), 186 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index bef556c..c241c9b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -135,7 +135,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
@Getter
private JobConfigurationManager jobConfigurationManager;
- private final String clusterName;
+ protected final String clusterName;
@Getter
protected final Config config;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 9f353ed..fd17216 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -131,7 +131,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
private final Optional<ContainerMetrics> containerMetrics;
- private final String taskRunnerId;
+ protected final String taskRunnerId;
private volatile boolean stopInProgress = false;
@@ -143,8 +143,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
protected final FileSystem fs;
private final List<Service> services = Lists.newArrayList();
- private final String applicationName;
- private final String applicationId;
+ protected final String applicationName;
+ protected final String applicationId;
private final Path appWorkPath;
private boolean isTaskDriver;
private boolean dedicatedTaskDriverCluster;
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
index df626df..1b72be9 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
@@ -24,8 +24,14 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -35,31 +41,26 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
-import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractScheduledService;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.concurrent.ScheduledTask;
-import org.apache.gobblin.util.concurrent.TaskScheduler;
-import org.apache.gobblin.util.concurrent.TaskSchedulerFactory;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.FileListUtils;
-import org.apache.gobblin.util.HadoopUtils;
/**
@@ -85,10 +86,7 @@ import org.apache.gobblin.util.HadoopUtils;
* .useDestFileSystem(FileSystem.get(URI.create(destFsUri), new Configuration()))
* .readFrom(new Path(srcLogDir))
* .writeTo(new Path(destLogDir))
- * .useInitialDelay(60)
- * .useCopyInterval(60)
- * .useMaxMinutesPerLogFile(60)
- * .useMaxBytesPerLogFile(1024 * 1024)
+ * .useSourceLogFileMonitorInterval(60)
* .useTimeUnit(TimeUnit.SECONDS)
* .build();
*
@@ -110,24 +108,22 @@ public class LogCopier extends AbstractScheduledService {
private static final Logger LOGGER = LoggerFactory.getLogger(LogCopier.class);
private static final long DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL = 120;
- private static final long DEFAULT_LOG_COPY_INTERVAL_SECONDS = 60;
- private static final long DEFAULT_MAX_MINUTES_PER_LOG_FILE = Long.MAX_VALUE;
- private static final long DEFAULT_MAX_BYTES_PER_LOG_FILE = Long.MAX_VALUE;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
private static final int DEFAULT_LINES_WRITTEN_BEFORE_FLUSH = 100;
+ private static final int DEFAULT_NUM_COPY_THREADS = 10;
+
private final FileSystem srcFs;
private final FileSystem destFs;
private final List<Path> srcLogDirs;
private final Path destLogDir;
private final long sourceLogFileMonitorInterval;
- private final long copyInterval;
- private final long maxMinutesPerLogFile;
- private final long maxBytesPerLogFile;
private final TimeUnit timeUnit;
private final Set<String> logFileExtensions;
+ private final int numCopyThreads;
+ private final String currentLogFileName;
private final Optional<List<Pattern>> includingRegexPatterns;
private final Optional<List<Pattern>> excludingRegexPatterns;
@@ -136,7 +132,11 @@ public class LogCopier extends AbstractScheduledService {
private final int linesWrittenBeforeFlush;
- private final TaskScheduler<Path, LogCopyTask> scheduler;
+ private final ExecutorService executorService;
+
+ @Getter
+ private final Set<String> copiedFileNames = Sets.newConcurrentHashSet();
+ private boolean shouldCopyCurrentLogFile;
private LogCopier(Builder builder) {
this.srcFs = builder.srcFs;
@@ -146,12 +146,11 @@ public class LogCopier extends AbstractScheduledService {
this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
this.sourceLogFileMonitorInterval = builder.sourceLogFileMonitorInterval;
- this.copyInterval = builder.copyInterval;
- this.maxMinutesPerLogFile = builder.maxMinutesPerLogFile;
- this.maxBytesPerLogFile = builder.maxBytesPerLogFile;
this.timeUnit = builder.timeUnit;
this.logFileExtensions = builder.logFileExtensions;
+ this.currentLogFileName = builder.currentLogFileName;
+ this.shouldCopyCurrentLogFile = false;
this.includingRegexPatterns = Optional.fromNullable(builder.includingRegexPatterns);
this.excludingRegexPatterns = Optional.fromNullable(builder.excludingRegexPatterns);
@@ -159,13 +158,24 @@ public class LogCopier extends AbstractScheduledService {
this.logFileNamePrefix = Optional.fromNullable(builder.logFileNamePrefix);
this.linesWrittenBeforeFlush = builder.linesWrittenBeforeFlush;
+ this.numCopyThreads = builder.numCopyThreads;
- this.scheduler = TaskSchedulerFactory.get(builder.schedulerName, Optional.<String> absent());
+ this.executorService = Executors.newFixedThreadPool(numCopyThreads);
}
@Override
protected void shutDown() throws Exception {
- this.scheduler.close();
+ try {
+ //We need to copy the current log file as part of shutdown sequence.
+ shouldCopyCurrentLogFile = true;
+ runOneIteration();
+ //Close the Filesystem objects, since these were created with auto close disabled.
+ LOGGER.debug("Closing FileSystem objects...");
+ this.destFs.close();
+ this.srcFs.close();
+ } finally {
+ super.shutDown();
+ }
}
@Override
@@ -178,53 +188,91 @@ public class LogCopier extends AbstractScheduledService {
return Scheduler.newFixedRateSchedule(0, this.sourceLogFileMonitorInterval, this.timeUnit);
}
+ private boolean shouldIncludeLogFile(FileStatus logFile) {
+ Path logFilePath = logFile.getPath();
+
+ //Skip copy of current log file if current log file copy is disabled
+ if (currentLogFileName.equals(Files.getNameWithoutExtension(logFilePath.getName()))) {
+ return shouldCopyCurrentLogFile;
+ }
+
+ //Skip copy of log file if it has already been copied previously.
+ if (copiedFileNames.contains(logFilePath.getName())) {
+ return false;
+ }
+
+ //Special case to accept all log file extensions.
+ if (LogCopier.this.logFileExtensions.isEmpty()) {
+ return true;
+ }
+
+ return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(logFilePath.getName()));
+
+ }
+
+ /**
+ * Prune the set of copied files by removing the set of files which have been already deleted from the source.
+ * This keeps the copiedFileNames from growing unboundedly and is useful when log rotation is enabled on the
+ * source dirs with maximum number of backups.
+ * @param srcLogFileNames
+ */
+ @VisibleForTesting
+ void pruneCopiedFileNames(Set<String> srcLogFileNames) {
+ Iterator<String> copiedFilesIterator = copiedFileNames.iterator();
+
+ while (copiedFilesIterator.hasNext()) {
+ String fileName = copiedFilesIterator.next();
+ if (!srcLogFileNames.contains(fileName)) {
+ copiedFilesIterator.remove();
+ }
+ }
+ }
+
/**
* Perform a check on new source log files and submit copy tasks for new log files.
*/
- private void checkSrcLogFiles() throws IOException {
+ @VisibleForTesting
+ void checkSrcLogFiles() throws IOException {
List<FileStatus> srcLogFiles = new ArrayList<>();
-
- for (Path logDirPath: this.srcLogDirs) {
- srcLogFiles.addAll(FileListUtils.listFilesRecursively(this.srcFs, logDirPath, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(path.getName()));
+ Set<String> srcLogFileNames = new HashSet<>();
+ Set <Path> newLogFiles = new HashSet<>();
+ for (Path logDirPath: srcLogDirs) {
+ srcLogFiles.addAll(FileListUtils.listFilesRecursively(srcFs, logDirPath));
+ //Remove the already copied files from the list of files to copy
+ for (FileStatus srcLogFile: srcLogFiles) {
+ if (shouldIncludeLogFile(srcLogFile)) {
+ newLogFiles.add(srcLogFile.getPath());
}
- }));
+ srcLogFileNames.add(srcLogFile.getPath().getName());
+ }
}
- if (srcLogFiles.isEmpty()) {
+ if (newLogFiles.isEmpty()) {
LOGGER.warn("No log file found under directories " + this.srcLogDirs);
return;
}
- Set<Path> newLogFiles = Sets.newHashSet();
- for (FileStatus srcLogFile : srcLogFiles) {
- newLogFiles.add(srcLogFile.getPath());
- }
-
- HashSet<Path> deletedLogFiles = Sets.newHashSet(getSourceFiles());
- // Compute the set of deleted log files since the last check
- deletedLogFiles.removeAll(newLogFiles);
- // Compute the set of new log files since the last check
- newLogFiles.removeAll(getSourceFiles());
-
+ List<Future> futures = new ArrayList<>();
// Schedule a copy task for each new log file
for (final Path srcLogFile : newLogFiles) {
String destLogFileName = this.logFileNamePrefix.isPresent()
? this.logFileNamePrefix.get() + "." + srcLogFile.getName() : srcLogFile.getName();
final Path destLogFile = new Path(this.destLogDir, destLogFileName);
-
- this.scheduler.schedule(new LogCopyTask(srcLogFile, destLogFile), this.copyInterval, this.timeUnit);
+ futures.add(this.executorService.submit(new LogCopyTask(srcLogFile, destLogFile)));
}
- // Cancel the copy task for each deleted log file
- for (Path deletedLogFile : deletedLogFiles) {
- Optional<LogCopyTask> logCopyTask = this.scheduler.getScheduledTask(deletedLogFile);
- if (logCopyTask.isPresent()) {
- this.scheduler.cancel(logCopyTask.get());
+ //Wait for copy tasks to finish
+ for (Future future: futures) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ LOGGER.error("LogCopyTask was interrupted - {}", e);
+ } catch (ExecutionException e) {
+ LOGGER.error("Failed LogCopyTask - {}", e);
}
}
+
+ pruneCopiedFileNames(srcLogFileNames);
}
/**
@@ -249,12 +297,13 @@ public class LogCopier extends AbstractScheduledService {
private Path destLogDir;
private long sourceLogFileMonitorInterval = DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL;
- private long copyInterval = DEFAULT_LOG_COPY_INTERVAL_SECONDS;
- private long maxMinutesPerLogFile = DEFAULT_MAX_MINUTES_PER_LOG_FILE;
- private long maxBytesPerLogFile = DEFAULT_MAX_BYTES_PER_LOG_FILE;
+
+ private int numCopyThreads = DEFAULT_NUM_COPY_THREADS;
+
private TimeUnit timeUnit = DEFAULT_TIME_UNIT;
private Set<String> logFileExtensions;
+ private String currentLogFileName;
private List<Pattern> includingRegexPatterns;
private List<Pattern> excludingRegexPatterns;
@@ -263,8 +312,6 @@ public class LogCopier extends AbstractScheduledService {
private int linesWrittenBeforeFlush = DEFAULT_LINES_WRITTEN_BEFORE_FLUSH;
- private String schedulerName = null;
-
/**
* Set the interval between two checks for the source log file monitor.
*
@@ -279,45 +326,9 @@ public class LogCopier extends AbstractScheduledService {
}
/**
- * Set the copy interval between two iterations of copies.
- *
- * @param copyInterval the copy interval between two iterations of copies
- * @return this {@link LogCopier.Builder} instance
- */
- public Builder useCopyInterval(long copyInterval) {
- Preconditions.checkArgument(copyInterval > 0, "Copy interval must be positive");
- this.copyInterval = copyInterval;
- return this;
- }
-
- /**
- * Set the max minutes per log file.
- *
- * @param maxMinutesPerLogFile the maximum minutes of logs a log file contains
- * @return this {@link LogCopier.Builder} instance
- */
- public Builder useMaxMinutesPerLogFile(long maxMinutesPerLogFile) {
- Preconditions.checkArgument(maxMinutesPerLogFile > 0, "Max minutes per log file must be positive");
- this.maxMinutesPerLogFile = maxMinutesPerLogFile;
- return this;
- }
-
- /**
- * Set the max bytes per log file.
+ * Set the {@link TimeUnit} used for the source log file monitor interval.
*
- * @param maxBytesPerLogFile the maximum bytes of a log file
- * @return this {@link LogCopier.Builder} instance
- */
- public Builder useMaxBytesPerLogFile(long maxBytesPerLogFile) {
- Preconditions.checkArgument(maxBytesPerLogFile > 0, "Max bytes per log file must be positive");
- this.maxBytesPerLogFile = maxBytesPerLogFile;
- return this;
- }
-
- /**
- * Set the {@link TimeUnit} used for the source log file monitor interval, initial delay, copy interval.
- *
- * @param timeUnit the {@link TimeUnit} used for the initial delay and copy interval
+ * @param timeUnit the {@link TimeUnit} used for the log file monitor interval
* @return this {@link LogCopier.Builder} instance
*/
public Builder useTimeUnit(TimeUnit timeUnit) {
@@ -449,14 +460,19 @@ public class LogCopier extends AbstractScheduledService {
}
/**
- * Set the scheduler to use for scheduling copy tasks.
- *
- * @param schedulerName the name of the scheduler
- * @return this {@link LogCopier.Builder} instance
+ * Set the current log file name
*/
- public Builder useScheduler(String schedulerName) {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(schedulerName), "Invalid scheduler name: " + schedulerName);
- this.schedulerName = schedulerName;
+ public Builder useCurrentLogFileName(String currentLogFileName) {
+ this.currentLogFileName = currentLogFileName;
+ return this;
+ }
+
+ /**
+ * Set the number of threads to use for copying container log files to dest FS.
+ * @param numCopyThreads
+ */
+ public Builder useNumCopyThreads(int numCopyThreads) {
+ this.numCopyThreads = numCopyThreads;
return this;
}
@@ -470,62 +486,31 @@ public class LogCopier extends AbstractScheduledService {
}
}
- private ImmutableList<Path> getSourceFiles() {
- return ImmutableList
- .copyOf(Iterables.transform(this.scheduler.getScheduledTasks(), new Function<LogCopyTask, Path>() {
- @Override
- public Path apply(LogCopyTask input) {
- return input.getKey();
- }
- }));
- }
-
- private class LogCopyTask implements ScheduledTask<Path> {
+ private class LogCopyTask implements Callable<Void> {
private final Path srcLogFile;
private final Path destLogFile;
- private final Stopwatch watch;
-
- // The task maintains the current source log file position itself
- private long currentPos = 0;
public LogCopyTask(Path srcLogFile, Path destLogFile) {
this.srcLogFile = srcLogFile;
this.destLogFile = destLogFile;
- this.watch = Stopwatch.createStarted();
- }
-
- @Override
- public Path getKey() {
- return this.srcLogFile;
}
@Override
- public void runOneIteration() {
+ public Void call() {
try {
- createNewLogFileIfNeeded();
- LOGGER.debug(String.format("Copying changes from %s to %s", this.srcLogFile, this.destLogFile));
copyChangesOfLogFile(LogCopier.this.srcFs.makeQualified(this.srcLogFile),
LogCopier.this.destFs.makeQualified(this.destLogFile));
} catch (IOException ioe) {
LOGGER.error(String.format("Failed while copying logs from %s to %s", this.srcLogFile, this.destLogFile), ioe);
}
- }
-
- private void createNewLogFileIfNeeded() throws IOException {
- if (LogCopier.this.destFs.exists(this.destLogFile)
- && (this.watch.elapsed(TimeUnit.MINUTES) > LogCopier.this.maxMinutesPerLogFile
- || LogCopier.this.destFs.getFileStatus(this.destLogFile).getLen() > LogCopier.this.maxBytesPerLogFile)) {
- HadoopUtils.renamePath(LogCopier.this.destFs, this.destLogFile,
- new Path(this.destLogFile.toString() + "." + System.currentTimeMillis()));
- this.watch.reset();
- this.watch.start();
- }
+ return null;
}
/**
* Copy changes for a single log file.
*/
private void copyChangesOfLogFile(Path srcFile, Path destFile) throws IOException {
+ LOGGER.info("Copying changes from {} to {}", srcFile.toString(), destFile.toString());
if (!LogCopier.this.srcFs.exists(srcFile)) {
LOGGER.warn("Source log file not found: " + srcFile);
return;
@@ -536,14 +521,10 @@ public class LogCopier extends AbstractScheduledService {
try (Closer closer = Closer.create()) {
fsDataInputStream = closer.register(LogCopier.this.srcFs.open(srcFile));
- // Seek to the the most recent position if it is available
- LOGGER.debug(String.format("Reading log file %s from position %d", srcFile, this.currentPos));
- fsDataInputStream.seek(this.currentPos);
BufferedReader srcLogFileReader = closer.register(
new BufferedReader(new InputStreamReader(fsDataInputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
- FSDataOutputStream outputStream = LogCopier.this.destFs.exists(destFile)
- ? LogCopier.this.destFs.append(destFile) : LogCopier.this.destFs.create(destFile);
+ FSDataOutputStream outputStream = LogCopier.this.destFs.create(destFile);
BufferedWriter destLogFileWriter = closer.register(
new BufferedWriter(new OutputStreamWriter(outputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
@@ -561,10 +542,8 @@ public class LogCopier extends AbstractScheduledService {
destLogFileWriter.flush();
}
}
- } finally {
- if (fsDataInputStream != null) {
- this.currentPos = fsDataInputStream.getPos();
- }
+ //Add the copied file to the list of files already copied to the destination.
+ LogCopier.this.copiedFileNames.add(srcFile.getName());
}
}
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/logs/LogCopierTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/logs/LogCopierTest.java
new file mode 100644
index 0000000..23ce0a3
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/logs/LogCopierTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.logs;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+
+public class LogCopierTest {
+ private FileSystem srcFs;
+ private FileSystem destFs;
+ private Path srcLogDir;
+ private Path destLogDir;
+ private LogCopier logCopier;
+ private String testString = "Test Log line.";
+
+ @BeforeClass
+ public void setUp() throws IOException {
+ this.srcFs = FileSystem.getLocal(new Configuration());
+ this.destFs = FileSystem.getLocal(new Configuration());
+ this.srcLogDir = new Path("/tmp/LogCopierTest/srcLogDir");
+ if (!srcFs.exists(srcLogDir)) {
+ srcFs.mkdirs(srcLogDir);
+ }
+ this.destLogDir = new Path("/tmp/LogCopierTest/destLogDir");
+ if (!destFs.exists(destLogDir)) {
+ destFs.mkdirs(destLogDir);
+ }
+ this.logCopier = LogCopier.newBuilder()
+ .readFrom(this.srcLogDir)
+ .useCurrentLogFileName("testLog")
+ .useSrcFileSystem(srcFs)
+ .useDestFileSystem(destFs)
+ .writeTo(destLogDir)
+ .acceptsLogFileExtensions(ImmutableSet.of()).build();
+ }
+
+ private void createFileHelper(FileSystem fs, Path path) throws IOException {
+ FSDataOutputStream fsDataOutputStream = fs.create(path);
+ fsDataOutputStream.writeBytes(testString);
+ fsDataOutputStream.close();
+ }
+
+ @Test
+ public void testCheckSrcLogFiles() throws Exception {
+ //Create test log files on the srcFs
+ createFileHelper(srcFs, new Path(srcLogDir, "testLog.log"));
+ createFileHelper(srcFs, new Path(srcLogDir, "testLog.log.1"));
+ //Run one iteration of the LogCopier. 1st rolled log file should be copied over.
+ this.logCopier.runOneIteration();
+ FileStatus[] destLogFiles = this.destFs.listStatus(destLogDir);
+ Assert.assertEquals(destLogFiles.length, 1);
+ Assert.assertEquals(destLogFiles[0].getLen(), testString.length() + 1);
+
+ createFileHelper(srcFs, new Path(srcLogDir, "testLog.log.2"));
+ //Run the 2nd iteration of LogCopier. 2nd rolled log file should be copied over.
+ this.logCopier.runOneIteration();
+ destLogFiles = this.destFs.listStatus(destLogDir);
+ Assert.assertEquals(destLogFiles.length, 2);
+ Assert.assertEquals(destLogFiles[0].getLen(), testString.length() + 1);
+ Assert.assertEquals(destLogFiles[1].getLen(), testString.length() + 1);
+
+ //Shutdown the LogCopier. The current log file (i.e. testLog.log) should be copied over.
+ this.logCopier.shutDown();
+ destLogFiles = this.destFs.listStatus(destLogDir);
+ Assert.assertEquals(destLogFiles.length, 3);
+ Assert.assertEquals(destLogFiles[0].getLen(), testString.length() + 1);
+ Assert.assertEquals(destLogFiles[1].getLen(), testString.length() + 1);
+ Assert.assertEquals(destLogFiles[2].getLen(), testString.length() + 1);
+ }
+
+ @Test (dependsOnMethods = "testCheckSrcLogFiles")
+ public void testPruneCopiedFileNames() throws Exception {
+ Set<String> srcLogFileNames = Sets.newHashSet("testLog.log");
+
+ Assert.assertEquals(logCopier.getCopiedFileNames().size(), 3);
+ logCopier.pruneCopiedFileNames(srcLogFileNames);
+ Assert.assertEquals(logCopier.getCopiedFileNames().size(), 1);
+ }
+
+ @AfterClass
+ public void cleanUp() throws IOException {
+ this.srcFs.delete(srcLogDir, true);
+ this.destFs.delete(destLogDir, true);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index ccb5419..acf31b1 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -56,6 +56,7 @@ import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -83,10 +84,12 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(),
GobblinClusterUtils.addDynamicConfig(config), Optional.<Path>absent());
+ String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
if (gobblinYarnLogSource.isLogSourcePresent()) {
+ Path appWorkDir = PathUtils.combinePaths(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.clusterName, this.applicationId), "AppMaster");
this.applicationLauncher
- .addService(gobblinYarnLogSource.buildLogCopier(this.config, containerId, this.fs, this.appWorkDir));
+ .addService(gobblinYarnLogSource.buildLogCopier(this.config, containerId.toString(), this.fs, appWorkDir));
}
this.yarnService = buildYarnService(this.config, applicationName, this.applicationId, yarnConfiguration, this.fs);
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 2d93007..827c597 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -209,6 +209,8 @@ public class GobblinYarnAppLauncher {
private final int jvmMemoryOverheadMbs;
private final double jvmMemoryXmxRatio;
+ private final String containerTimezone;
+
public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException {
this.config = config;
@@ -267,6 +269,8 @@ public class GobblinYarnAppLauncher {
this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
+ this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
+ GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
}
/**
@@ -525,7 +529,6 @@ public class GobblinYarnAppLauncher {
appSubmissionContext.setQueue(this.appQueueName);
appSubmissionContext.setPriority(Priority.newInstance(0));
appSubmissionContext.setAMContainerSpec(amContainerLaunchContext);
-
// Also setup container local resources by copying local jars and files the container need to HDFS
addContainerLocalResources(applicationId);
@@ -675,6 +678,7 @@ public class GobblinYarnAppLauncher {
return new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
.append(" -Xmx").append((int) (memoryMbs * this.jvmMemoryXmxRatio) - this.jvmMemoryOverheadMbs).append("M")
+ .append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(appMasterClassName).append(".").append(ApplicationConstants.STDOUT)
.append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs))
@@ -735,12 +739,6 @@ public class GobblinYarnAppLauncher {
.readFrom(getHdfsLogDir(appWorkDir))
.writeTo(sinkLogDir)
.acceptsLogFileExtensions(ImmutableSet.of(ApplicationConstants.STDOUT, ApplicationConstants.STDERR));
- if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE)) {
- builder.useMaxBytesPerLogFile(config.getBytes(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE));
- }
- if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER)) {
- builder.useScheduler(config.getString(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER));
- }
return builder.build();
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 975b9e8..10ae50c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -73,10 +73,6 @@ public class GobblinYarnConfigurationKeys {
public static final String CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = GOBBLIN_YARN_PREFIX + "container.jvmMemoryXmxRatio";
public static final double DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO = 1.0;
- //Container Log location properties
- public static final String GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "app.container.log.dir";
- public static final String GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "app.container.log.file";
-
// Helix configuration properties.
public static final String HELIX_INSTANCE_MAX_RETRIES = GOBBLIN_YARN_PREFIX + "helix.instance.max.retries";
@@ -90,15 +86,23 @@ public class GobblinYarnConfigurationKeys {
// Resource/dependencies configuration properties.
public static final String LIB_JARS_DIR_KEY = GOBBLIN_YARN_PREFIX + "lib.jars.dir";
- public static final String LOGS_SINK_ROOT_DIR_KEY = GOBBLIN_YARN_PREFIX + "logs.sink.root.dir";
public static final String LIB_JARS_DIR_NAME = "_libjars";
public static final String APP_JARS_DIR_NAME = "_appjars";
public static final String APP_FILES_DIR_NAME = "_appfiles";
public static final String APP_LOGS_DIR_NAME = "_applogs";
+ //Container Log location properties
+ public static final String GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "app.container.log.dir";
+ public static final String GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "app.container.log.file";
+
// Other misc configuration properties.
- public static final String LOG_COPIER_SCHEDULER = GOBBLIN_YARN_PREFIX + "log.copier.scheduler";
- public static final String LOG_COPIER_MAX_FILE_SIZE = GOBBLIN_YARN_PREFIX + "log.copier.max.file.size";
- public static final String GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE = "log4j-yarn.properties";
+ public static final String LOGS_SINK_ROOT_DIR_KEY = GOBBLIN_YARN_PREFIX + "logs.sink.root.dir";
+ public static final String LOG_FILE_EXTENSIONS = GOBBLIN_YARN_PREFIX + "log.file.extensions" ;
public static final String LOG_COPIER_DISABLE_DRIVER_COPY = GOBBLIN_YARN_PREFIX + "log.copier.disable.driver.copy";
+ public static final String GOBBLIN_YARN_CONTAINER_TIMEZONE = GOBBLIN_YARN_PREFIX + "container.timezone" ;
+ public static final String DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE = "America/Los_Angeles" ;
+
+ //Constant definitions
+ public static final String GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE = "log4j-yarn.properties";
+ public static final String JVM_USER_TIMEZONE_CONFIG = "user.timezone";
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
index c1c0e61..21858e0 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.yarn;
import java.io.IOException;
+import java.net.URI;
import java.util.List;
import java.util.stream.Collectors;
@@ -29,8 +30,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.logs.LogCopier;
@@ -46,6 +50,11 @@ import org.apache.gobblin.util.logs.LogCopier;
*/
class GobblinYarnLogSource {
private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
+ private static final Configuration AUTO_CLOSE_CONFIG = new Configuration();
+
+ static {
+ AUTO_CLOSE_CONFIG.setBoolean("fs.automatic.close", false);
+ }
/**
* Return if the log source is present or not.
@@ -66,25 +75,35 @@ class GobblinYarnLogSource {
* @return a {@link LogCopier} instance
* @throws IOException if it fails on any IO operation
*/
- protected LogCopier buildLogCopier(Config config, ContainerId containerId, FileSystem destFs, Path appWorkDir)
+ protected LogCopier buildLogCopier(Config config, String containerId, FileSystem destFs, Path appWorkDir)
throws IOException {
LogCopier.Builder builder = LogCopier.newBuilder()
- .useSrcFileSystem(FileSystem.getLocal(new Configuration()))
- .useDestFileSystem(destFs)
+ .useSrcFileSystem(buildFileSystem(config, true))
+ .useDestFileSystem(buildFileSystem(config, false))
.readFrom(getLocalLogDirs())
.writeTo(getHdfsLogDir(containerId, destFs, appWorkDir))
- .acceptsLogFileExtensions(ImmutableSet.of(ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
- .useLogFileNamePrefix(containerId.toString());
- if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE)) {
- builder.useMaxBytesPerLogFile(config.getBytes(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE));
- }
- if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER)) {
- builder.useScheduler(config.getString(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER));
- }
+ .useCurrentLogFileName(Files.getNameWithoutExtension(System.getProperty(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME)));
+
+ builder.acceptsLogFileExtensions(config.hasPath(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS) ? ImmutableSet
+ .copyOf(Splitter.on(",").splitToList(config.getString(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS)))
+ : ImmutableSet.of());
+
return builder.build();
}
/**
+ * Return a new (non-cached) {@link FileSystem} instance. The {@link FileSystem} instance
+ * returned by the method has automatic closing disabled. The user of the instance needs to handle closing of the
+ * instance, typically as part of its shutdown sequence.
+ */
+ private FileSystem buildFileSystem(Config config, boolean isLocal) throws IOException {
+ return isLocal ? FileSystem.newInstanceLocal(AUTO_CLOSE_CONFIG)
+ : config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
+ .newInstance(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), AUTO_CLOSE_CONFIG)
+ : FileSystem.newInstance(AUTO_CLOSE_CONFIG);
+ }
+
+ /**
* Multiple directories may be specified in the LOG_DIRS string. Split them up and return a list of {@link Path}s.
* @return list of {@link Path}s to the log directories
* @throws IOException
@@ -94,12 +113,11 @@ class GobblinYarnLogSource {
return COMMA_SPLITTER.splitToList(logDirs).stream().map(e -> new Path(e)).collect(Collectors.toList());
}
- private Path getHdfsLogDir(ContainerId containerId, FileSystem destFs, Path appWorkDir) throws IOException {
- Path logRootDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME);
+ private Path getHdfsLogDir(String containerId, FileSystem destFs, Path appWorkDir) throws IOException {
+ Path logRootDir = PathUtils.combinePaths(appWorkDir.toString(), GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME, containerId);
if (!destFs.exists(logRootDir)) {
destFs.mkdirs(logRootDir);
}
-
- return new Path(logRootDir, containerId.toString());
+ return logRootDir;
}
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 77183fd..c90306c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,31 +25,26 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
-
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Service;
-
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinTaskRunner;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -58,11 +53,13 @@ import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
public class GobblinYarnTaskRunner extends GobblinTaskRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class);
+ private final ContainerId containerId;
public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
GobblinClusterUtils.addDynamicConfig(config), appWorkDirOptional);
+ this.containerId = containerId;
}
@Override
@@ -72,6 +69,20 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
LOGGER.info("Adding YarnContainerSecurityManager since login is keytab based");
services.add(new YarnContainerSecurityManager(this.config, this.fs, this.eventBus));
}
+
+ if (config.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
+ GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
+ String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
+
+ if (gobblinYarnLogSource.isLogSourcePresent()) {
+ try {
+ services.add(gobblinYarnLogSource.buildLogCopier(this.config, this.taskRunnerId, this.fs,
+ new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId))));
+ } catch (Exception e) {
+ LOGGER.warn("Cannot add LogCopier service to the service manager due to {}", e);
+ }
+ }
+ }
return services;
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 512041e..1d67b9c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -151,6 +151,7 @@ public class YarnService extends AbstractIdleService {
private final int helixInstanceMaxRetries;
private final Optional<String> containerJvmArgs;
+ private final String containerTimezone;
private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
@@ -257,6 +258,8 @@ public class YarnService extends AbstractIdleService {
this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
+ this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
+ GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
}
@SuppressWarnings("unused")
@@ -558,6 +561,7 @@ public class YarnService extends AbstractIdleService {
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
.append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) -
this.jvmMemoryOverheadMbs).append("M")
+ .append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
.append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))