You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/01 20:24:03 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1136] Make LogCopier be able to refresh FileSystem for long running job use cases

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f67aac  [GOBBLIN-1136] Make LogCopier be able to refresh FileSystem for long running job use cases
8f67aac is described below

commit 8f67aac08e223039c0bf233eb6d2b332ec86bed1
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri May 1 13:23:52 2020 -0700

    [GOBBLIN-1136] Make LogCopier be able to refresh FileSystem for long running job use cases
    
    Closes #2975 from ZihanLi58/GOBBLIN-1136
---
 .../java/org/apache/gobblin/util/AvroUtils.java    |   6 +-
 .../util/filesystem/FileSystemSupplier.java        |  32 ++---
 .../org/apache/gobblin/util/logs/LogCopier.java    | 143 ++++++++++++++-------
 .../gobblin/yarn/GobblinApplicationMaster.java     |   7 +-
 .../apache/gobblin/yarn/GobblinYarnLogSource.java  |  55 ++++----
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |  16 ++-
 .../gobblin/yarn/YarnAppMasterSecurityManager.java |   5 +-
 .../gobblin/yarn/YarnContainerSecurityManager.java |  31 +++--
 8 files changed, 180 insertions(+), 115 deletions(-)

diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index bf4dc76..65d3279 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -476,7 +476,7 @@ public class AvroUtils {
   }
 
   public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite, FsPermission perm)
-    throws IOException {
+      throws IOException {
     writeSchemaToFile(schema, filePath, null, fs, overwrite, perm);
   }
 
@@ -914,7 +914,7 @@ public class AvroUtils {
     List<Field> newOutputFields = Stream.concat(outputFields.stream(), fieldList.stream()).collect(Collectors.toList());
 
     Schema outputSchema = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(),
-            inputSchema.getNamespace(), inputSchema.isError());
+        inputSchema.getNamespace(), inputSchema.isError());
     outputSchema.setFields(newOutputFields);
     copyProperties(inputSchema, outputSchema);
     return outputSchema;
@@ -932,7 +932,7 @@ public class AvroUtils {
    * @return an outputRecord that contains a union of the fields in the inputRecord and the field-values in the fieldMap
    */
   public static GenericRecord decorateRecord(GenericRecord inputRecord, @Nonnull Map<String, Object> fieldMap,
-          Schema outputSchema) {
+      Schema outputSchema) {
     GenericRecord outputRecord = new GenericData.Record(outputSchema);
     inputRecord.getSchema().getFields().forEach(f -> outputRecord.put(f.name(), inputRecord.get(f.name())));
     fieldMap.forEach((key, value) -> outputRecord.put(key, value));
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemSupplier.java
similarity index 50%
copy from gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
copy to gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemSupplier.java
index 7c3be1c..23a9725 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemSupplier.java
@@ -15,32 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.yarn;
+package org.apache.gobblin.util.filesystem;
 
-import com.google.common.base.Throwables;
-import com.google.common.eventbus.EventBus;
-import com.typesafe.config.Config;
 import java.io.IOException;
-import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 
-public class YarnAppMasterSecurityManager extends YarnContainerSecurityManager{
-
-  private YarnService yarnService;
-  public YarnAppMasterSecurityManager(Config config, FileSystem fs, EventBus eventBus, YarnService yarnService) {
-    super(config, fs, eventBus);
-    this.yarnService = yarnService;
-  }
+/**
+ * An interface of supplier to get FileSystem
+ */
 
-  @Override
-  public void handleTokenFileUpdatedEvent(DelegationTokenUpdatedEvent delegationTokenUpdatedEvent) {
-    super.handleTokenFileUpdatedEvent(delegationTokenUpdatedEvent);
-    try {
-      yarnService.updateToken();
-    } catch (IOException ioe) {
-      throw Throwables.propagate(ioe);
-    }
-  }
+public interface FileSystemSupplier {
+  /**
+   * Function to get fileSystem
+   * @return the new FileSystem for using
+   */
+  public FileSystem getFileSystem() throws IOException;
 }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
index d516c80..fa54b1b 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
@@ -17,6 +17,17 @@
 
 package org.apache.gobblin.util.logs;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractScheduledService;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.IOException;
@@ -35,7 +46,12 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.DatasetFilterUtils;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.filesystem.FileSystemSupplier;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -44,24 +60,6 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.AbstractScheduledService;
-
-import lombok.Getter;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.DatasetFilterUtils;
-import org.apache.gobblin.util.FileListUtils;
-
 
 /**
  * A utility service that periodically reads log files in a source log file directory for changes
@@ -113,13 +111,15 @@ public class LogCopier extends AbstractScheduledService {
 
   private static final int DEFAULT_NUM_COPY_THREADS = 10;
 
-  private final FileSystem srcFs;
-  private final FileSystem destFs;
+  private FileSystem srcFs;
+  private FileSystem destFs;
   private final List<Path> srcLogDirs;
   private final Path destLogDir;
 
   private final long sourceLogFileMonitorInterval;
   private final TimeUnit timeUnit;
+  private final FileSystemSupplier destFsSupplier;
+  private final FileSystemSupplier srcFsSupplier;
 
   private final Set<String> logFileExtensions;
   private final int numCopyThreads;
@@ -134,14 +134,21 @@ public class LogCopier extends AbstractScheduledService {
 
   private final ExecutorService executorService;
 
+  @Setter
+  private boolean needToUpdateDestFs;
+  @Setter
+  private boolean needToUpdateSrcFs;
   @Getter
   private final Set<String> copiedFileNames = Sets.newConcurrentHashSet();
   private boolean shouldCopyCurrentLogFile;
 
-  private LogCopier(Builder builder) {
-    this.srcFs = builder.srcFs;
-    this.destFs = builder.destFs;
-
+  private LogCopier(Builder builder) throws IOException {
+    this.destFsSupplier = builder.destFsSupplier;
+    this.srcFsSupplier = builder.srcFsSupplier;
+    this.srcFs = this.srcFsSupplier != null ? this.srcFsSupplier.getFileSystem() : builder.srcFs;
+    Preconditions.checkArgument(this.srcFs != null, "srcFs or srcFsSupplier has not been set");
+    this.destFs = this.destFsSupplier != null ? this.destFsSupplier.getFileSystem() : builder.destFs;
+    Preconditions.checkArgument(this.destFs != null, "destFs or destFsSupplier has not been set");
     this.srcLogDirs = builder.srcLogDirs.stream().map(d -> this.srcFs.makeQualified(d)).collect(Collectors.toList());
     this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
 
@@ -151,6 +158,8 @@ public class LogCopier extends AbstractScheduledService {
     this.logFileExtensions = builder.logFileExtensions;
     this.currentLogFileName = builder.currentLogFileName;
     this.shouldCopyCurrentLogFile = false;
+    this.needToUpdateDestFs = false;
+    this.needToUpdateSrcFs = false;
 
     this.includingRegexPatterns = Optional.fromNullable(builder.includingRegexPatterns);
     this.excludingRegexPatterns = Optional.fromNullable(builder.excludingRegexPatterns);
@@ -207,7 +216,6 @@ public class LogCopier extends AbstractScheduledService {
     }
 
     return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(logFilePath.getName()));
-
   }
 
   /**
@@ -235,11 +243,11 @@ public class LogCopier extends AbstractScheduledService {
   void checkSrcLogFiles() throws IOException {
     List<FileStatus> srcLogFiles = new ArrayList<>();
     Set<String> srcLogFileNames = new HashSet<>();
-    Set <Path> newLogFiles = new HashSet<>();
-    for (Path logDirPath: srcLogDirs) {
+    Set<Path> newLogFiles = new HashSet<>();
+    for (Path logDirPath : srcLogDirs) {
       srcLogFiles.addAll(FileListUtils.listFilesRecursively(srcFs, logDirPath));
       //Remove the already copied files from the list of files to copy
-      for (FileStatus srcLogFile: srcLogFiles) {
+      for (FileStatus srcLogFile : srcLogFiles) {
         if (shouldIncludeLogFile(srcLogFile)) {
           newLogFiles.add(srcLogFile.getPath());
         }
@@ -255,14 +263,15 @@ public class LogCopier extends AbstractScheduledService {
     List<Future> futures = new ArrayList<>();
     // Schedule a copy task for each new log file
     for (final Path srcLogFile : newLogFiles) {
-      String destLogFileName = this.logFileNamePrefix.isPresent()
-          ? this.logFileNamePrefix.get() + "." + srcLogFile.getName() : srcLogFile.getName();
+      String destLogFileName =
+          this.logFileNamePrefix.isPresent() ? this.logFileNamePrefix.get() + "." + srcLogFile.getName()
+              : srcLogFile.getName();
       final Path destLogFile = new Path(this.destLogDir, destLogFileName);
       futures.add(this.executorService.submit(new LogCopyTask(srcLogFile, destLogFile)));
     }
 
     //Wait for copy tasks to finish
-    for (Future future: futures) {
+    for (Future future : futures) {
       try {
         future.get();
       } catch (InterruptedException e) {
@@ -271,7 +280,24 @@ public class LogCopier extends AbstractScheduledService {
         LOGGER.error("Failed LogCopyTask - {}", e);
       }
     }
-
+    if (needToUpdateDestFs) {
+      if (destFsSupplier == null) {
+        throw new IOException("Try to update dest fileSystem but destFsSupplier has not been set");
+      }
+      this.destFs.close();
+      this.destFs = destFsSupplier.getFileSystem();
+      LOGGER.info("Dest fs updated" + destFs.toString());
+      needToUpdateDestFs = false;
+    }
+    if (needToUpdateSrcFs) {
+      if (srcFsSupplier == null) {
+        throw new IOException("Try to update source fileSystem but srcFsSupplier has not been set");
+      }
+      this.srcFs.close();
+      this.srcFs = srcFsSupplier.getFileSystem();
+      LOGGER.info("Src fs updated" + srcFs.toString());
+      needToUpdateSrcFs = false;
+    }
     pruneCopiedFileNames(srcLogFileNames);
   }
 
@@ -291,10 +317,12 @@ public class LogCopier extends AbstractScheduledService {
 
     private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
 
-    private FileSystem srcFs;
+    private FileSystem srcFs = null;
     private List<Path> srcLogDirs;
-    private FileSystem destFs;
+    private FileSystem destFs = null;
     private Path destLogDir;
+    private FileSystemSupplier destFsSupplier = null;
+    private FileSystemSupplier srcFsSupplier = null;
 
     private long sourceLogFileMonitorInterval = DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL;
 
@@ -313,11 +341,11 @@ public class LogCopier extends AbstractScheduledService {
     private int linesWrittenBeforeFlush = DEFAULT_LINES_WRITTEN_BEFORE_FLUSH;
 
     /**
-    * Set the interval between two checks for the source log file monitor.
-    *
-    * @param sourceLogFileMonitorInterval the interval between two checks for the source log file monitor
-    * @return this {@link LogCopier.Builder} instance
-    */
+     * Set the interval between two checks for the source log file monitor.
+     *
+     * @param sourceLogFileMonitorInterval the interval between two checks for the source log file monitor
+     * @return this {@link LogCopier.Builder} instance
+     */
     public Builder useSourceLogFileMonitorInterval(long sourceLogFileMonitorInterval) {
       Preconditions.checkArgument(sourceLogFileMonitorInterval > 0,
           "Source log file monitor interval must be positive");
@@ -338,6 +366,30 @@ public class LogCopier extends AbstractScheduledService {
     }
 
     /**
+     * Set the {@link FileSystemSupplier} used for generating new Dest FileSystem later when token been updated.
+     *
+     * @param supplier the {@link FileSystemSupplier} used for generating new Dest FileSystem
+     * @return this {@link LogCopier.Builder} instance
+     */
+    public Builder useDestFsSupplier(FileSystemSupplier supplier) {
+      Preconditions.checkNotNull(supplier);
+      this.destFsSupplier = supplier;
+      return this;
+    }
+
+    /**
+     * Set the {@link FileSystemSupplier} used for generating new source FileSystem later when token been updated.
+     *
+     * @param supplier the {@link FileSystemSupplier} used for generating new source FileSystem
+     * @return this {@link LogCopier.Builder} instance
+     */
+    public Builder useSrcFsSupplier(FileSystemSupplier supplier) {
+      Preconditions.checkNotNull(supplier);
+      this.srcFsSupplier = supplier;
+      return this;
+    }
+
+    /**
      * Set the set of acceptable log file extensions.
      *
      * @param logFileExtensions the set of acceptable log file extensions
@@ -481,7 +533,7 @@ public class LogCopier extends AbstractScheduledService {
      *
      * @return a new {@link LogCopier} instance
      */
-    public LogCopier build() {
+    public LogCopier build() throws IOException {
       return new LogCopier(this);
     }
   }
@@ -565,10 +617,11 @@ public class LogCopier extends AbstractScheduledService {
      * </p>
      */
     private boolean shouldCopyLine(String line) {
-      boolean including = !LogCopier.this.includingRegexPatterns.isPresent()
-          || DatasetFilterUtils.stringInPatterns(line, LogCopier.this.includingRegexPatterns.get());
-      boolean excluding = LogCopier.this.excludingRegexPatterns.isPresent()
-          && DatasetFilterUtils.stringInPatterns(line, LogCopier.this.excludingRegexPatterns.get());
+      boolean including =
+          !LogCopier.this.includingRegexPatterns.isPresent() || DatasetFilterUtils.stringInPatterns(line,
+              LogCopier.this.includingRegexPatterns.get());
+      boolean excluding = LogCopier.this.excludingRegexPatterns.isPresent() && DatasetFilterUtils.stringInPatterns(line,
+          LogCopier.this.excludingRegexPatterns.get());
 
       return !excluding && including;
     }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index c47032e..c1f7cc0 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 
+import org.apache.gobblin.util.logs.LogCopier;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -79,6 +80,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
 
   @Getter
   private final YarnService yarnService;
+  private LogCopier logCopier;
 
   public GobblinApplicationMaster(String applicationName, String applicationId, ContainerId containerId, Config config,
       YarnConfiguration yarnConfiguration) throws Exception {
@@ -91,8 +93,9 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
     GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
     if (gobblinYarnLogSource.isLogSourcePresent()) {
       Path appWorkDir = PathUtils.combinePaths(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.clusterName, this.applicationId), "AppMaster");
+      logCopier = gobblinYarnLogSource.buildLogCopier(this.config, containerId.toString(), this.fs, appWorkDir);
       this.applicationLauncher
-          .addService(gobblinYarnLogSource.buildLogCopier(this.config, containerId.toString(), this.fs, appWorkDir));
+          .addService(logCopier);
     }
 
     this.yarnService = buildYarnService(this.config, applicationName, this.applicationId, yarnConfiguration, this.fs);
@@ -127,7 +130,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
    * Build the {@link YarnAppMasterSecurityManager} for the Application Master.
    */
   private YarnContainerSecurityManager buildYarnContainerSecurityManager(Config config, FileSystem fs) {
-    return new YarnAppMasterSecurityManager(config, fs, this.eventBus, this.yarnService);
+    return new YarnAppMasterSecurityManager(config, fs, this.eventBus, this.logCopier, this.yarnService);
   }
 
   @Override
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
index 64127ac..49a2d12 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
@@ -17,26 +17,24 @@
 
 package org.apache.gobblin.yarn;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.stream.Collectors;
-
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filesystem.FileSystemSupplier;
+import org.apache.gobblin.util.logs.LogCopier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.logs.LogCopier;
-
 
 /**
  * A base class for container processes that are sources of Gobblin Yarn application logs.
@@ -74,15 +72,27 @@ class GobblinYarnLogSource {
   protected LogCopier buildLogCopier(Config config, String containerId, FileSystem destFs, Path appWorkDir)
       throws IOException {
     LogCopier.Builder builder = LogCopier.newBuilder()
-            .useSrcFileSystem(buildFileSystem(config, true))
-            .useDestFileSystem(buildFileSystem(config, false))
-            .readFrom(getLocalLogDirs())
-            .writeTo(getHdfsLogDir(containerId, destFs, appWorkDir))
-            .useCurrentLogFileName(Files.getNameWithoutExtension(System.getProperty(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME)));
+        .useDestFsSupplier(new FileSystemSupplier() {
+          @Override
+          public FileSystem getFileSystem() throws IOException {
+            return buildFileSystem(config, false);
+          }
+        })
+        .useSrcFsSupplier(new FileSystemSupplier() {
+          @Override
+          public FileSystem getFileSystem() throws IOException {
+            return buildFileSystem(config, true);
+          }
+        })
+        .readFrom(getLocalLogDirs())
+        .writeTo(getHdfsLogDir(containerId, destFs, appWorkDir))
+        .useCurrentLogFileName(Files.getNameWithoutExtension(
+            System.getProperty(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME)));
 
-    builder.acceptsLogFileExtensions(config.hasPath(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS) ? ImmutableSet
-        .copyOf(Splitter.on(",").splitToList(config.getString(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS)))
-        : ImmutableSet.of());
+    builder.acceptsLogFileExtensions(
+        config.hasPath(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS) ? ImmutableSet.copyOf(
+            Splitter.on(",").splitToList(config.getString(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS)))
+            : ImmutableSet.of());
 
     return builder.build();
   }
@@ -92,10 +102,10 @@ class GobblinYarnLogSource {
    * returned by the method has automatic closing disabled. The user of the instance needs to handle closing of the
    * instance, typically as part of its shutdown sequence.
    */
-  private FileSystem buildFileSystem(Config config, boolean isLocal) throws IOException {
+  public static FileSystem buildFileSystem(Config config, boolean isLocal) throws IOException {
     return isLocal ? FileSystem.newInstanceLocal(AUTO_CLOSE_CONFIG)
-        : config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
-            .newInstance(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), AUTO_CLOSE_CONFIG)
+        : config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem.newInstance(
+            URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), AUTO_CLOSE_CONFIG)
             : FileSystem.newInstance(AUTO_CLOSE_CONFIG);
   }
 
@@ -110,7 +120,8 @@ class GobblinYarnLogSource {
   }
 
   private Path getHdfsLogDir(String containerId, FileSystem destFs, Path appWorkDir) throws IOException {
-    Path logRootDir = PathUtils.combinePaths(appWorkDir.toString(), GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME, containerId);
+    Path logRootDir =
+        PathUtils.combinePaths(appWorkDir.toString(), GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME, containerId);
     if (!destFs.exists(logRootDir)) {
       destFs.mkdirs(logRootDir);
     }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 8e88b2e..f79a5bb 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.util.logs.LogCopier;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -70,24 +71,25 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
   public List<Service> getServices() {
     List<Service> services = new ArrayList<>();
     services.addAll(super.getServices());
-    if (UserGroupInformation.isSecurityEnabled()) {
-      LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
-      services.add(new YarnContainerSecurityManager(this.clusterConfig, this.fs, this.eventBus));
-    }
-
+    LogCopier logCopier = null;
     if (clusterConfig.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
       GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
       String containerLogDir = clusterConfig.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
 
       if (gobblinYarnLogSource.isLogSourcePresent()) {
         try {
-            services.add(gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
-                new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId))));
+          logCopier = gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
+              new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId)));
+            services.add(logCopier);
         } catch (Exception e) {
           LOGGER.warn("Cannot add LogCopier service to the service manager due to", e);
         }
       }
     }
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
+      services.add(new YarnContainerSecurityManager(this.clusterConfig, this.fs, this.eventBus, logCopier));
+    }
     return services;
   }
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
index 7c3be1c..ba6754a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
@@ -21,6 +21,7 @@ import com.google.common.base.Throwables;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import java.io.IOException;
+import org.apache.gobblin.util.logs.LogCopier;
 import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -29,8 +30,8 @@ import org.apache.hadoop.fs.Path;
 public class YarnAppMasterSecurityManager extends YarnContainerSecurityManager{
 
   private YarnService yarnService;
-  public YarnAppMasterSecurityManager(Config config, FileSystem fs, EventBus eventBus, YarnService yarnService) {
-    super(config, fs, eventBus);
+  public YarnAppMasterSecurityManager(Config config, FileSystem fs, EventBus eventBus, LogCopier logCopier, YarnService yarnService) {
+    super(config, fs, eventBus, logCopier);
     this.yarnService = yarnService;
   }
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
index 060da6a..f98f7d9 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
@@ -17,8 +17,15 @@
 
 package org.apache.gobblin.yarn;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
 import java.io.IOException;
-
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
@@ -28,16 +35,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
-
 
 /**
  * A class for managing token renewing in the containers including the container for the
@@ -58,13 +55,19 @@ public class YarnContainerSecurityManager extends AbstractIdleService {
   private final FileSystem fs;
   private final Path tokenFilePath;
   private final EventBus eventBus;
+  private final LogCopier logCopier;
 
   public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus eventBus) {
+    this(config, fs, eventBus, null);
+  }
+
+  public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus eventBus, LogCopier logCopier) {
     this.fs = fs;
     this.tokenFilePath = new Path(this.fs.getHomeDirectory(),
         config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY) + Path.SEPARATOR
             + GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
     this.eventBus = eventBus;
+    this.logCopier = logCopier;
   }
 
   @SuppressWarnings("unused")
@@ -72,6 +75,10 @@ public class YarnContainerSecurityManager extends AbstractIdleService {
   public void handleTokenFileUpdatedEvent(DelegationTokenUpdatedEvent delegationTokenUpdatedEvent) {
     try {
       addCredentials(readCredentials(this.tokenFilePath));
+      if (this.logCopier != null) {
+        this.logCopier.setNeedToUpdateDestFs(true);
+        this.logCopier.setNeedToUpdateSrcFs(true);
+      }
     } catch (IOException ioe) {
       throw Throwables.propagate(ioe);
     }
@@ -99,7 +106,7 @@ public class YarnContainerSecurityManager extends AbstractIdleService {
   @VisibleForTesting
   void addCredentials(Credentials credentials) throws IOException {
     for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
-      LOGGER.info("updating "+token.toString());
+      LOGGER.info("updating " + token.toString());
     }
     UserGroupInformation.getCurrentUser().addCredentials(credentials);
   }