You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/13 23:00:03 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-846] Enhance LogCopier service to handle continuous YARN log …

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 778d05f  [GOBBLIN-846] Enhance LogCopier service to handle continuous YARN log …
778d05f is described below

commit 778d05f965dde97db45fd4019a9985024a1fd02a
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Aug 13 15:59:52 2019 -0700

    [GOBBLIN-846] Enhance LogCopier service to handle continuous YARN log …
    
    Closes #2701 from sv2000/logCopier
---
 .../gobblin/cluster/GobblinClusterManager.java     |   2 +-
 .../apache/gobblin/cluster/GobblinTaskRunner.java  |   6 +-
 .../org/apache/gobblin/util/logs/LogCopier.java    | 269 ++++++++++-----------
 .../apache/gobblin/util/logs/LogCopierTest.java    | 113 +++++++++
 .../gobblin/yarn/GobblinApplicationMaster.java     |   5 +-
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  12 +-
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |  20 +-
 .../apache/gobblin/yarn/GobblinYarnLogSource.java  |  48 ++--
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |  23 +-
 .../java/org/apache/gobblin/yarn/YarnService.java  |   4 +
 10 files changed, 316 insertions(+), 186 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index bef556c..c241c9b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -135,7 +135,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
   @Getter
   private JobConfigurationManager jobConfigurationManager;
 
-  private final String clusterName;
+  protected final String clusterName;
   @Getter
   protected final Config config;
 
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 9f353ed..fd17216 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -131,7 +131,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
   private final Optional<ContainerMetrics> containerMetrics;
 
-  private final String taskRunnerId;
+  protected final String taskRunnerId;
 
   private volatile boolean stopInProgress = false;
 
@@ -143,8 +143,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
   protected final FileSystem fs;
   private final List<Service> services = Lists.newArrayList();
-  private final String applicationName;
-  private final String applicationId;
+  protected final String applicationName;
+  protected final String applicationId;
   private final Path appWorkPath;
   private boolean isTaskDriver;
   private boolean dedicatedTaskDriverCluster;
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
index df626df..1b72be9 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
@@ -24,8 +24,14 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -35,31 +41,26 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
-import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractScheduledService;
 
+import lombok.Getter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.concurrent.ScheduledTask;
-import org.apache.gobblin.util.concurrent.TaskScheduler;
-import org.apache.gobblin.util.concurrent.TaskSchedulerFactory;
 import org.apache.gobblin.util.DatasetFilterUtils;
 import org.apache.gobblin.util.FileListUtils;
-import org.apache.gobblin.util.HadoopUtils;
 
 
 /**
@@ -85,10 +86,7 @@ import org.apache.gobblin.util.HadoopUtils;
  *           .useDestFileSystem(FileSystem.get(URI.create(destFsUri), new Configuration()))
  *           .readFrom(new Path(srcLogDir))
  *           .writeTo(new Path(destLogDir))
- *           .useInitialDelay(60)
- *           .useCopyInterval(60)
- *           .useMaxMinutesPerLogFile(60)
- *           .useMaxBytesPerLogFile(1024 * 1024)
+ *           .useSourceLogFileMonitorInterval(60)
  *           .useTimeUnit(TimeUnit.SECONDS)
  *           .build();
  *
@@ -110,24 +108,22 @@ public class LogCopier extends AbstractScheduledService {
   private static final Logger LOGGER = LoggerFactory.getLogger(LogCopier.class);
 
   private static final long DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL = 120;
-  private static final long DEFAULT_LOG_COPY_INTERVAL_SECONDS = 60;
-  private static final long DEFAULT_MAX_MINUTES_PER_LOG_FILE = Long.MAX_VALUE;
-  private static final long DEFAULT_MAX_BYTES_PER_LOG_FILE = Long.MAX_VALUE;
   private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
   private static final int DEFAULT_LINES_WRITTEN_BEFORE_FLUSH = 100;
 
+  private static final int DEFAULT_NUM_COPY_THREADS = 10;
+
   private final FileSystem srcFs;
   private final FileSystem destFs;
   private final List<Path> srcLogDirs;
   private final Path destLogDir;
 
   private final long sourceLogFileMonitorInterval;
-  private final long copyInterval;
-  private final long maxMinutesPerLogFile;
-  private final long maxBytesPerLogFile;
   private final TimeUnit timeUnit;
 
   private final Set<String> logFileExtensions;
+  private final int numCopyThreads;
+  private final String currentLogFileName;
 
   private final Optional<List<Pattern>> includingRegexPatterns;
   private final Optional<List<Pattern>> excludingRegexPatterns;
@@ -136,7 +132,11 @@ public class LogCopier extends AbstractScheduledService {
 
   private final int linesWrittenBeforeFlush;
 
-  private final TaskScheduler<Path, LogCopyTask> scheduler;
+  private final ExecutorService executorService;
+
+  @Getter
+  private final Set<String> copiedFileNames = Sets.newConcurrentHashSet();
+  private boolean shouldCopyCurrentLogFile;
 
   private LogCopier(Builder builder) {
     this.srcFs = builder.srcFs;
@@ -146,12 +146,11 @@ public class LogCopier extends AbstractScheduledService {
     this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
 
     this.sourceLogFileMonitorInterval = builder.sourceLogFileMonitorInterval;
-    this.copyInterval = builder.copyInterval;
-    this.maxMinutesPerLogFile = builder.maxMinutesPerLogFile;
-    this.maxBytesPerLogFile = builder.maxBytesPerLogFile;
     this.timeUnit = builder.timeUnit;
 
     this.logFileExtensions = builder.logFileExtensions;
+    this.currentLogFileName = builder.currentLogFileName;
+    this.shouldCopyCurrentLogFile = false;
 
     this.includingRegexPatterns = Optional.fromNullable(builder.includingRegexPatterns);
     this.excludingRegexPatterns = Optional.fromNullable(builder.excludingRegexPatterns);
@@ -159,13 +158,24 @@ public class LogCopier extends AbstractScheduledService {
     this.logFileNamePrefix = Optional.fromNullable(builder.logFileNamePrefix);
 
     this.linesWrittenBeforeFlush = builder.linesWrittenBeforeFlush;
+    this.numCopyThreads = builder.numCopyThreads;
 
-    this.scheduler = TaskSchedulerFactory.get(builder.schedulerName, Optional.<String> absent());
+    this.executorService = Executors.newFixedThreadPool(numCopyThreads);
   }
 
   @Override
   protected void shutDown() throws Exception {
-    this.scheduler.close();
+    try {
+      //We need to copy the current log file as part of shutdown sequence.
+      shouldCopyCurrentLogFile = true;
+      runOneIteration();
+      //Close the Filesystem objects, since these were created with auto close disabled.
+      LOGGER.debug("Closing FileSystem objects...");
+      this.destFs.close();
+      this.srcFs.close();
+    } finally {
+      super.shutDown();
+    }
   }
 
   @Override
@@ -178,53 +188,91 @@ public class LogCopier extends AbstractScheduledService {
     return Scheduler.newFixedRateSchedule(0, this.sourceLogFileMonitorInterval, this.timeUnit);
   }
 
+  private boolean shouldIncludeLogFile(FileStatus logFile) {
+    Path logFilePath = logFile.getPath();
+
+    //Skip copy of current log file if current log file copy is disabled
+    if (currentLogFileName.equals(Files.getNameWithoutExtension(logFilePath.getName()))) {
+      return shouldCopyCurrentLogFile;
+    }
+
+    //Skip copy of log file if it has already been copied previously.
+    if (copiedFileNames.contains(logFilePath.getName())) {
+      return false;
+    }
+
+    //Special case to accept all log file extensions.
+    if (LogCopier.this.logFileExtensions.isEmpty()) {
+      return true;
+    }
+
+    return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(logFilePath.getName()));
+
+  }
+
+  /**
+   * Prune the set of copied files by removing the set of files which have been already deleted from the source.
+   * This keeps the copiedFileNames from growing unboundedly and is useful when log rotation is enabled on the
+   * source dirs with maximum number of backups.
+   * @param srcLogFileNames
+   */
+  @VisibleForTesting
+  void pruneCopiedFileNames(Set<String> srcLogFileNames) {
+    Iterator<String> copiedFilesIterator = copiedFileNames.iterator();
+
+    while (copiedFilesIterator.hasNext()) {
+      String fileName = copiedFilesIterator.next();
+      if (!srcLogFileNames.contains(fileName)) {
+        copiedFilesIterator.remove();
+      }
+    }
+  }
+
   /**
    * Perform a check on new source log files and submit copy tasks for new log files.
    */
-  private void checkSrcLogFiles() throws IOException {
+  @VisibleForTesting
+  void checkSrcLogFiles() throws IOException {
     List<FileStatus> srcLogFiles = new ArrayList<>();
-
-    for (Path logDirPath: this.srcLogDirs) {
-      srcLogFiles.addAll(FileListUtils.listFilesRecursively(this.srcFs, logDirPath, new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-          return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(path.getName()));
+    Set<String> srcLogFileNames = new HashSet<>();
+    Set <Path> newLogFiles = new HashSet<>();
+    for (Path logDirPath: srcLogDirs) {
+      srcLogFiles.addAll(FileListUtils.listFilesRecursively(srcFs, logDirPath));
+      //Remove the already copied files from the list of files to copy
+      for (FileStatus srcLogFile: srcLogFiles) {
+        if (shouldIncludeLogFile(srcLogFile)) {
+          newLogFiles.add(srcLogFile.getPath());
         }
-      }));
+        srcLogFileNames.add(srcLogFile.getPath().getName());
+      }
     }
 
-    if (srcLogFiles.isEmpty()) {
+    if (newLogFiles.isEmpty()) {
       LOGGER.warn("No log file found under directories " + this.srcLogDirs);
       return;
     }
 
-    Set<Path> newLogFiles = Sets.newHashSet();
-    for (FileStatus srcLogFile : srcLogFiles) {
-      newLogFiles.add(srcLogFile.getPath());
-    }
-
-    HashSet<Path> deletedLogFiles = Sets.newHashSet(getSourceFiles());
-    // Compute the set of deleted log files since the last check
-    deletedLogFiles.removeAll(newLogFiles);
-    // Compute the set of new log files since the last check
-    newLogFiles.removeAll(getSourceFiles());
-
+    List<Future> futures = new ArrayList<>();
     // Schedule a copy task for each new log file
     for (final Path srcLogFile : newLogFiles) {
       String destLogFileName = this.logFileNamePrefix.isPresent()
           ? this.logFileNamePrefix.get() + "." + srcLogFile.getName() : srcLogFile.getName();
       final Path destLogFile = new Path(this.destLogDir, destLogFileName);
-
-      this.scheduler.schedule(new LogCopyTask(srcLogFile, destLogFile), this.copyInterval, this.timeUnit);
+      futures.add(this.executorService.submit(new LogCopyTask(srcLogFile, destLogFile)));
     }
 
-    // Cancel the copy task for each deleted log file
-    for (Path deletedLogFile : deletedLogFiles) {
-      Optional<LogCopyTask> logCopyTask = this.scheduler.getScheduledTask(deletedLogFile);
-      if (logCopyTask.isPresent()) {
-        this.scheduler.cancel(logCopyTask.get());
+    //Wait for copy tasks to finish
+    for (Future future: futures) {
+      try {
+        future.get();
+      } catch (InterruptedException e) {
+        LOGGER.error("LogCopyTask was interrupted - {}", e);
+      } catch (ExecutionException e) {
+        LOGGER.error("Failed LogCopyTask - {}", e);
       }
     }
+
+    pruneCopiedFileNames(srcLogFileNames);
   }
 
   /**
@@ -249,12 +297,13 @@ public class LogCopier extends AbstractScheduledService {
     private Path destLogDir;
 
     private long sourceLogFileMonitorInterval = DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL;
-    private long copyInterval = DEFAULT_LOG_COPY_INTERVAL_SECONDS;
-    private long maxMinutesPerLogFile = DEFAULT_MAX_MINUTES_PER_LOG_FILE;
-    private long maxBytesPerLogFile = DEFAULT_MAX_BYTES_PER_LOG_FILE;
+
+    private int numCopyThreads = DEFAULT_NUM_COPY_THREADS;
+
     private TimeUnit timeUnit = DEFAULT_TIME_UNIT;
 
     private Set<String> logFileExtensions;
+    private String currentLogFileName;
 
     private List<Pattern> includingRegexPatterns;
     private List<Pattern> excludingRegexPatterns;
@@ -263,8 +312,6 @@ public class LogCopier extends AbstractScheduledService {
 
     private int linesWrittenBeforeFlush = DEFAULT_LINES_WRITTEN_BEFORE_FLUSH;
 
-    private String schedulerName = null;
-
     /**
     * Set the interval between two checks for the source log file monitor.
     *
@@ -279,45 +326,9 @@ public class LogCopier extends AbstractScheduledService {
     }
 
     /**
-     * Set the copy interval between two iterations of copies.
-     *
-     * @param copyInterval the copy interval between two iterations of copies
-     * @return this {@link LogCopier.Builder} instance
-     */
-    public Builder useCopyInterval(long copyInterval) {
-      Preconditions.checkArgument(copyInterval > 0, "Copy interval must be positive");
-      this.copyInterval = copyInterval;
-      return this;
-    }
-
-    /**
-     * Set the max minutes per log file.
-     *
-     * @param maxMinutesPerLogFile the maximum minutes of logs a log file contains
-     * @return this {@link LogCopier.Builder} instance
-     */
-    public Builder useMaxMinutesPerLogFile(long maxMinutesPerLogFile) {
-      Preconditions.checkArgument(maxMinutesPerLogFile > 0, "Max minutes per log file must be positive");
-      this.maxMinutesPerLogFile = maxMinutesPerLogFile;
-      return this;
-    }
-
-    /**
-     * Set the max bytes per log file.
+     * Set the {@link TimeUnit} used for the source log file monitor interval.
      *
-     * @param maxBytesPerLogFile the maximum bytes of a log file
-     * @return this {@link LogCopier.Builder} instance
-     */
-    public Builder useMaxBytesPerLogFile(long maxBytesPerLogFile) {
-      Preconditions.checkArgument(maxBytesPerLogFile > 0, "Max bytes per log file must be positive");
-      this.maxBytesPerLogFile = maxBytesPerLogFile;
-      return this;
-    }
-
-    /**
-     * Set the {@link TimeUnit} used for the source log file monitor interval, initial delay, copy interval.
-     *
-     * @param timeUnit the {@link TimeUnit} used for the initial delay and copy interval
+     * @param timeUnit the {@link TimeUnit} used for the log file monitor interval
      * @return this {@link LogCopier.Builder} instance
      */
     public Builder useTimeUnit(TimeUnit timeUnit) {
@@ -449,14 +460,19 @@ public class LogCopier extends AbstractScheduledService {
     }
 
     /**
-     * Set the scheduler to use for scheduling copy tasks.
-     *
-     * @param schedulerName the name of the scheduler
-     * @return this {@link LogCopier.Builder} instance
+     * Set the current log file name
      */
-    public Builder useScheduler(String schedulerName) {
-      Preconditions.checkArgument(!Strings.isNullOrEmpty(schedulerName), "Invalid scheduler name: " + schedulerName);
-      this.schedulerName = schedulerName;
+    public Builder useCurrentLogFileName(String currentLogFileName) {
+      this.currentLogFileName = currentLogFileName;
+      return this;
+    }
+
+    /**
+     * Set the number of threads to use for copying container log files to dest FS.
+     * @param numCopyThreads
+     */
+    public Builder useNumCopyThreads(int numCopyThreads) {
+      this.numCopyThreads = numCopyThreads;
       return this;
     }
 
@@ -470,62 +486,31 @@ public class LogCopier extends AbstractScheduledService {
     }
   }
 
-  private ImmutableList<Path> getSourceFiles() {
-    return ImmutableList
-        .copyOf(Iterables.transform(this.scheduler.getScheduledTasks(), new Function<LogCopyTask, Path>() {
-          @Override
-          public Path apply(LogCopyTask input) {
-            return input.getKey();
-          }
-        }));
-  }
-
-  private class LogCopyTask implements ScheduledTask<Path> {
+  private class LogCopyTask implements Callable<Void> {
     private final Path srcLogFile;
     private final Path destLogFile;
-    private final Stopwatch watch;
-
-    // The task maintains the current source log file position itself
-    private long currentPos = 0;
 
     public LogCopyTask(Path srcLogFile, Path destLogFile) {
       this.srcLogFile = srcLogFile;
       this.destLogFile = destLogFile;
-      this.watch = Stopwatch.createStarted();
-    }
-
-    @Override
-    public Path getKey() {
-      return this.srcLogFile;
     }
 
     @Override
-    public void runOneIteration() {
+    public Void call() {
       try {
-        createNewLogFileIfNeeded();
-        LOGGER.debug(String.format("Copying changes from %s to %s", this.srcLogFile, this.destLogFile));
         copyChangesOfLogFile(LogCopier.this.srcFs.makeQualified(this.srcLogFile),
             LogCopier.this.destFs.makeQualified(this.destLogFile));
       } catch (IOException ioe) {
         LOGGER.error(String.format("Failed while copying logs from %s to %s", this.srcLogFile, this.destLogFile), ioe);
       }
-    }
-
-    private void createNewLogFileIfNeeded() throws IOException {
-      if (LogCopier.this.destFs.exists(this.destLogFile)
-          && (this.watch.elapsed(TimeUnit.MINUTES) > LogCopier.this.maxMinutesPerLogFile
-              || LogCopier.this.destFs.getFileStatus(this.destLogFile).getLen() > LogCopier.this.maxBytesPerLogFile)) {
-        HadoopUtils.renamePath(LogCopier.this.destFs, this.destLogFile,
-            new Path(this.destLogFile.toString() + "." + System.currentTimeMillis()));
-        this.watch.reset();
-        this.watch.start();
-      }
+      return null;
     }
 
     /**
      * Copy changes for a single log file.
      */
     private void copyChangesOfLogFile(Path srcFile, Path destFile) throws IOException {
+      LOGGER.info("Copying changes from {} to {}", srcFile.toString(), destFile.toString());
       if (!LogCopier.this.srcFs.exists(srcFile)) {
         LOGGER.warn("Source log file not found: " + srcFile);
         return;
@@ -536,14 +521,10 @@ public class LogCopier extends AbstractScheduledService {
 
       try (Closer closer = Closer.create()) {
         fsDataInputStream = closer.register(LogCopier.this.srcFs.open(srcFile));
-        // Seek to the the most recent position if it is available
-        LOGGER.debug(String.format("Reading log file %s from position %d", srcFile, this.currentPos));
-        fsDataInputStream.seek(this.currentPos);
         BufferedReader srcLogFileReader = closer.register(
             new BufferedReader(new InputStreamReader(fsDataInputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
 
-        FSDataOutputStream outputStream = LogCopier.this.destFs.exists(destFile)
-            ? LogCopier.this.destFs.append(destFile) : LogCopier.this.destFs.create(destFile);
+        FSDataOutputStream outputStream = LogCopier.this.destFs.create(destFile);
         BufferedWriter destLogFileWriter = closer.register(
             new BufferedWriter(new OutputStreamWriter(outputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
 
@@ -561,10 +542,8 @@ public class LogCopier extends AbstractScheduledService {
             destLogFileWriter.flush();
           }
         }
-      } finally {
-        if (fsDataInputStream != null) {
-          this.currentPos = fsDataInputStream.getPos();
-        }
+        //Add the copied file to the list of files already copied to the destination.
+        LogCopier.this.copiedFileNames.add(srcFile.getName());
       }
     }
 
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/logs/LogCopierTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/logs/LogCopierTest.java
new file mode 100644
index 0000000..23ce0a3
--- /dev/null
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/logs/LogCopierTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.logs;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+
+public class LogCopierTest {
+  private FileSystem srcFs;
+  private FileSystem destFs;
+  private Path srcLogDir;
+  private Path destLogDir;
+  private LogCopier logCopier;
+  private String testString = "Test Log line.";
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    this.srcFs = FileSystem.getLocal(new Configuration());
+    this.destFs = FileSystem.getLocal(new Configuration());
+    this.srcLogDir = new Path("/tmp/LogCopierTest/srcLogDir");
+    if (!srcFs.exists(srcLogDir)) {
+      srcFs.mkdirs(srcLogDir);
+    }
+    this.destLogDir = new Path("/tmp/LogCopierTest/destLogDir");
+    if (!destFs.exists(destLogDir)) {
+      destFs.mkdirs(destLogDir);
+    }
+    this.logCopier = LogCopier.newBuilder()
+        .readFrom(this.srcLogDir)
+        .useCurrentLogFileName("testLog")
+        .useSrcFileSystem(srcFs)
+        .useDestFileSystem(destFs)
+        .writeTo(destLogDir)
+        .acceptsLogFileExtensions(ImmutableSet.of()).build();
+  }
+
+  private void createFileHelper(FileSystem fs, Path path) throws IOException {
+    FSDataOutputStream fsDataOutputStream = fs.create(path);
+    fsDataOutputStream.writeBytes(testString);
+    fsDataOutputStream.close();
+  }
+
+  @Test
+  public void testCheckSrcLogFiles() throws Exception {
+    //Create test log files on the srcFs
+    createFileHelper(srcFs, new Path(srcLogDir, "testLog.log"));
+    createFileHelper(srcFs, new Path(srcLogDir, "testLog.log.1"));
+    //Run one iteration of the LogCopier. 1st rolled log file should be copied over.
+    this.logCopier.runOneIteration();
+    FileStatus[] destLogFiles = this.destFs.listStatus(destLogDir);
+    Assert.assertEquals(destLogFiles.length, 1);
+    Assert.assertEquals(destLogFiles[0].getLen(), testString.length() + 1);
+
+    createFileHelper(srcFs, new Path(srcLogDir, "testLog.log.2"));
+    //Run the 2nd iteration of LogCopier. 2nd rolled log file should be copied over.
+    this.logCopier.runOneIteration();
+    destLogFiles = this.destFs.listStatus(destLogDir);
+    Assert.assertEquals(destLogFiles.length, 2);
+    Assert.assertEquals(destLogFiles[0].getLen(), testString.length() + 1);
+    Assert.assertEquals(destLogFiles[1].getLen(), testString.length() + 1);
+
+    //Shutdown the LogCopier. The current log file (i.e. testLog.log) should be copied over.
+    this.logCopier.shutDown();
+    destLogFiles = this.destFs.listStatus(destLogDir);
+    Assert.assertEquals(destLogFiles.length, 3);
+    Assert.assertEquals(destLogFiles[0].getLen(), testString.length() + 1);
+    Assert.assertEquals(destLogFiles[1].getLen(), testString.length() + 1);
+    Assert.assertEquals(destLogFiles[2].getLen(), testString.length() + 1);
+  }
+
+  @Test (dependsOnMethods = "testCheckSrcLogFiles")
+  public void testPruneCopiedFileNames() throws Exception {
+    Set<String> srcLogFileNames = Sets.newHashSet("testLog.log");
+
+    Assert.assertEquals(logCopier.getCopiedFileNames().size(), 3);
+    logCopier.pruneCopiedFileNames(srcLogFileNames);
+    Assert.assertEquals(logCopier.getCopiedFileNames().size(), 1);
+  }
+
+  @AfterClass
+  public void cleanUp() throws IOException {
+    this.srcFs.delete(srcLogDir, true);
+    this.destFs.delete(destLogDir, true);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index ccb5419..acf31b1 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -56,6 +56,7 @@ import org.apache.gobblin.cluster.GobblinClusterManager;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -83,10 +84,12 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
     super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(),
         GobblinClusterUtils.addDynamicConfig(config), Optional.<Path>absent());
 
+    String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
     GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
     if (gobblinYarnLogSource.isLogSourcePresent()) {
+      Path appWorkDir = PathUtils.combinePaths(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.clusterName, this.applicationId), "AppMaster");
       this.applicationLauncher
-          .addService(gobblinYarnLogSource.buildLogCopier(this.config, containerId, this.fs, this.appWorkDir));
+          .addService(gobblinYarnLogSource.buildLogCopier(this.config, containerId.toString(), this.fs, appWorkDir));
     }
 
     this.yarnService = buildYarnService(this.config, applicationName, this.applicationId, yarnConfiguration, this.fs);
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 2d93007..827c597 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -209,6 +209,8 @@ public class GobblinYarnAppLauncher {
   private final int jvmMemoryOverheadMbs;
   private final double jvmMemoryXmxRatio;
 
+  private final String containerTimezone;
+
   public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException {
     this.config = config;
 
@@ -267,6 +269,8 @@ public class GobblinYarnAppLauncher {
 
     this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL,
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
+    this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
+        GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
   }
 
   /**
@@ -525,7 +529,6 @@ public class GobblinYarnAppLauncher {
     appSubmissionContext.setQueue(this.appQueueName);
     appSubmissionContext.setPriority(Priority.newInstance(0));
     appSubmissionContext.setAMContainerSpec(amContainerLaunchContext);
-
     // Also setup container local resources by copying local jars and files the container need to HDFS
     addContainerLocalResources(applicationId);
 
@@ -675,6 +678,7 @@ public class GobblinYarnAppLauncher {
     return new StringBuilder()
         .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
         .append(" -Xmx").append((int) (memoryMbs * this.jvmMemoryXmxRatio) - this.jvmMemoryOverheadMbs).append("M")
+        .append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
         .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
         .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(appMasterClassName).append(".").append(ApplicationConstants.STDOUT)
         .append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs))
@@ -735,12 +739,6 @@ public class GobblinYarnAppLauncher {
             .readFrom(getHdfsLogDir(appWorkDir))
             .writeTo(sinkLogDir)
             .acceptsLogFileExtensions(ImmutableSet.of(ApplicationConstants.STDOUT, ApplicationConstants.STDERR));
-    if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE)) {
-      builder.useMaxBytesPerLogFile(config.getBytes(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE));
-    }
-    if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER)) {
-      builder.useScheduler(config.getString(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER));
-    }
     return builder.build();
   }
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 975b9e8..10ae50c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -73,10 +73,6 @@ public class GobblinYarnConfigurationKeys {
   public static final String CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = GOBBLIN_YARN_PREFIX + "container.jvmMemoryXmxRatio";
   public static final double DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO = 1.0;
 
-  //Container Log location properties
-  public static final String GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "app.container.log.dir";
-  public static final String GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "app.container.log.file";
-
   // Helix configuration properties.
   public static final String HELIX_INSTANCE_MAX_RETRIES = GOBBLIN_YARN_PREFIX + "helix.instance.max.retries";
 
@@ -90,15 +86,23 @@ public class GobblinYarnConfigurationKeys {
   // Resource/dependencies configuration properties.
   public static final String LIB_JARS_DIR_KEY = GOBBLIN_YARN_PREFIX + "lib.jars.dir";
 
-  public static final String LOGS_SINK_ROOT_DIR_KEY = GOBBLIN_YARN_PREFIX + "logs.sink.root.dir";
   public static final String LIB_JARS_DIR_NAME = "_libjars";
   public static final String APP_JARS_DIR_NAME = "_appjars";
   public static final String APP_FILES_DIR_NAME = "_appfiles";
   public static final String APP_LOGS_DIR_NAME = "_applogs";
 
+  //Container Log location properties
+  public static final String GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "app.container.log.dir";
+  public static final String GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "app.container.log.file";
+
   // Other misc configuration properties.
-  public static final String LOG_COPIER_SCHEDULER = GOBBLIN_YARN_PREFIX + "log.copier.scheduler";
-  public static final String LOG_COPIER_MAX_FILE_SIZE = GOBBLIN_YARN_PREFIX + "log.copier.max.file.size";
-  public static final String GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE = "log4j-yarn.properties";
+  public static final String LOGS_SINK_ROOT_DIR_KEY = GOBBLIN_YARN_PREFIX + "logs.sink.root.dir";
+  public static final String LOG_FILE_EXTENSIONS = GOBBLIN_YARN_PREFIX + "log.file.extensions" ;
   public static final String LOG_COPIER_DISABLE_DRIVER_COPY = GOBBLIN_YARN_PREFIX + "log.copier.disable.driver.copy";
+  public static final String GOBBLIN_YARN_CONTAINER_TIMEZONE = GOBBLIN_YARN_PREFIX + "container.timezone" ;
+  public static final String DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE = "America/Los_Angeles" ;
+
+  //Constant definitions
+  public static final String GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE = "log4j-yarn.properties";
+  public static final String JVM_USER_TIMEZONE_CONFIG = "user.timezone";
 }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
index c1c0e61..21858e0 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.yarn;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -29,8 +30,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.logs.LogCopier;
 
 
@@ -46,6 +50,11 @@ import org.apache.gobblin.util.logs.LogCopier;
  */
 class GobblinYarnLogSource {
   private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
+  private static final Configuration AUTO_CLOSE_CONFIG = new Configuration();
+
+  static {
+    AUTO_CLOSE_CONFIG.setBoolean("fs.automatic.close", false);
+  }
 
   /**
    * Return if the log source is present or not.
@@ -66,25 +75,35 @@ class GobblinYarnLogSource {
    * @return a {@link LogCopier} instance
    * @throws IOException if it fails on any IO operation
    */
-  protected LogCopier buildLogCopier(Config config, ContainerId containerId, FileSystem destFs, Path appWorkDir)
+  protected LogCopier buildLogCopier(Config config, String containerId, FileSystem destFs, Path appWorkDir)
       throws IOException {
     LogCopier.Builder builder = LogCopier.newBuilder()
-            .useSrcFileSystem(FileSystem.getLocal(new Configuration()))
-            .useDestFileSystem(destFs)
+            .useSrcFileSystem(buildFileSystem(config, true))
+            .useDestFileSystem(buildFileSystem(config, false))
             .readFrom(getLocalLogDirs())
             .writeTo(getHdfsLogDir(containerId, destFs, appWorkDir))
-            .acceptsLogFileExtensions(ImmutableSet.of(ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
-            .useLogFileNamePrefix(containerId.toString());
-    if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE)) {
-      builder.useMaxBytesPerLogFile(config.getBytes(GobblinYarnConfigurationKeys.LOG_COPIER_MAX_FILE_SIZE));
-    }
-    if (config.hasPath(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER)) {
-      builder.useScheduler(config.getString(GobblinYarnConfigurationKeys.LOG_COPIER_SCHEDULER));
-    }
+            .useCurrentLogFileName(Files.getNameWithoutExtension(System.getProperty(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME)));
+
+    builder.acceptsLogFileExtensions(config.hasPath(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS) ? ImmutableSet
+        .copyOf(Splitter.on(",").splitToList(config.getString(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS)))
+        : ImmutableSet.of());
+
     return builder.build();
   }
 
   /**
+   * Return a new (non-cached) {@link FileSystem} instance. The {@link FileSystem} instance
+   * returned by the method has automatic closing disabled. The user of the instance needs to handle closing of the
+   * instance, typically as part of its shutdown sequence.
+   */
+  private FileSystem buildFileSystem(Config config, boolean isLocal) throws IOException {
+    return isLocal ? FileSystem.newInstanceLocal(AUTO_CLOSE_CONFIG)
+        : config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
+            .newInstance(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), AUTO_CLOSE_CONFIG)
+            : FileSystem.newInstance(AUTO_CLOSE_CONFIG);
+  }
+
+  /**
    * Multiple directories may be specified in the LOG_DIRS string. Split them up and return a list of {@link Path}s.
    * @return list of {@link Path}s to the log directories
    * @throws IOException
@@ -94,12 +113,11 @@ class GobblinYarnLogSource {
     return COMMA_SPLITTER.splitToList(logDirs).stream().map(e -> new Path(e)).collect(Collectors.toList());
   }
 
-  private Path getHdfsLogDir(ContainerId containerId, FileSystem destFs, Path appWorkDir) throws IOException {
-    Path logRootDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME);
+  private Path getHdfsLogDir(String containerId, FileSystem destFs, Path appWorkDir) throws IOException {
+    Path logRootDir = PathUtils.combinePaths(appWorkDir.toString(), GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME, containerId);
     if (!destFs.exists(logRootDir)) {
       destFs.mkdirs(logRootDir);
     }
-
-    return new Path(logRootDir, containerId.toString());
+    return logRootDir;
   }
 }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 77183fd..c90306c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,31 +25,26 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-
 import org.apache.hadoop.fs.Path;
-
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Service;
-
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.cluster.GobblinTaskRunner;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
 import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -58,11 +53,13 @@ import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 public class GobblinYarnTaskRunner extends GobblinTaskRunner {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class);
+  private final ContainerId containerId;
 
   public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
     super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
         GobblinClusterUtils.addDynamicConfig(config), appWorkDirOptional);
+    this.containerId = containerId;
   }
 
   @Override
@@ -72,6 +69,20 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
       LOGGER.info("Adding YarnContainerSecurityManager since login is keytab based");
       services.add(new YarnContainerSecurityManager(this.config, this.fs, this.eventBus));
     }
+
+    if (config.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
+      GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
+      String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
+
+      if (gobblinYarnLogSource.isLogSourcePresent()) {
+        try {
+            services.add(gobblinYarnLogSource.buildLogCopier(this.config, this.taskRunnerId, this.fs,
+                new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId))));
+        } catch (Exception e) {
+          LOGGER.warn("Cannot add LogCopier service to the service manager due to {}", e);
+        }
+      }
+    }
     return services;
   }
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 512041e..1d67b9c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -151,6 +151,7 @@ public class YarnService extends AbstractIdleService {
   private final int helixInstanceMaxRetries;
 
   private final Optional<String> containerJvmArgs;
+  private final String containerTimezone;
 
   private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
 
@@ -257,6 +258,8 @@ public class YarnService extends AbstractIdleService {
 
     this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL,
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
+    this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
+        GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
   }
 
   @SuppressWarnings("unused")
@@ -558,6 +561,7 @@ public class YarnService extends AbstractIdleService {
         .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
         .append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) -
             this.jvmMemoryOverheadMbs).append("M")
+        .append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
         .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
         .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
         .append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))