You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/04 20:22:24 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-723] Add support to the LogCopier for copying from multiple source paths

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dab5b8a  [GOBBLIN-723] Add support to the LogCopier for copying from multiple source paths
dab5b8a is described below

commit dab5b8a04209669ac8da728ae1f32bdf24154199
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Thu Apr 4 13:22:18 2019 -0700

    [GOBBLIN-723] Add support to the LogCopier for copying from multiple source paths
    
    Closes #2590 from htran1/yarn_fix_log_copier
---
 .../org/apache/gobblin/util/logs/LogCopier.java    | 40 ++++++++++++++++------
 .../apache/gobblin/yarn/GobblinYarnLogSource.java  | 18 +++++++---
 2 files changed, 43 insertions(+), 15 deletions(-)

diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
index 9e4b957..df626df 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
@@ -22,11 +22,13 @@ import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -116,7 +118,7 @@ public class LogCopier extends AbstractScheduledService {
 
   private final FileSystem srcFs;
   private final FileSystem destFs;
-  private final Path srcLogDir;
+  private final List<Path> srcLogDirs;
   private final Path destLogDir;
 
   private final long sourceLogFileMonitorInterval;
@@ -140,7 +142,7 @@ public class LogCopier extends AbstractScheduledService {
     this.srcFs = builder.srcFs;
     this.destFs = builder.destFs;
 
-    this.srcLogDir = this.srcFs.makeQualified(builder.srcLogDir);
+    this.srcLogDirs = builder.srcLogDirs.stream().map(d -> this.srcFs.makeQualified(d)).collect(Collectors.toList());
     this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
 
     this.sourceLogFileMonitorInterval = builder.sourceLogFileMonitorInterval;
@@ -180,15 +182,19 @@ public class LogCopier extends AbstractScheduledService {
    * Perform a check on new source log files and submit copy tasks for new log files.
    */
   private void checkSrcLogFiles() throws IOException {
-    List<FileStatus> srcLogFiles = FileListUtils.listFilesRecursively(this.srcFs, this.srcLogDir, new PathFilter() {
-      @Override
-      public boolean accept(Path path) {
-        return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(path.getName()));
-      }
-    });
+    List<FileStatus> srcLogFiles = new ArrayList<>();
+
+    for (Path logDirPath: this.srcLogDirs) {
+      srcLogFiles.addAll(FileListUtils.listFilesRecursively(this.srcFs, logDirPath, new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(path.getName()));
+        }
+      }));
+    }
 
     if (srcLogFiles.isEmpty()) {
-      LOGGER.warn("No log file found under directory " + this.srcLogDir);
+      LOGGER.warn("No log file found under directories " + this.srcLogDirs);
       return;
     }
 
@@ -238,7 +244,7 @@ public class LogCopier extends AbstractScheduledService {
     private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
 
     private FileSystem srcFs;
-    private Path srcLogDir;
+    private List<Path> srcLogDirs;
     private FileSystem destFs;
     private Path destLogDir;
 
@@ -388,7 +394,19 @@ public class LogCopier extends AbstractScheduledService {
      */
     public Builder readFrom(Path srcLogDir) {
       Preconditions.checkNotNull(srcLogDir);
-      this.srcLogDir = srcLogDir;
+      this.srcLogDirs = ImmutableList.of(srcLogDir);
+      return this;
+    }
+
+    /**
+     * Set the paths of the source log file directories to read from.
+     *
+     * @param srcLogDirs the paths of the source log file directories to read from
+     * @return this {@link LogCopier.Builder} instance
+     */
+    public Builder readFrom(List<Path> srcLogDirs) {
+      Preconditions.checkNotNull(srcLogDirs);
+      this.srcLogDirs = srcLogDirs;
       return this;
     }
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
index 7163804..c1c0e61 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
@@ -18,15 +18,18 @@
 package org.apache.gobblin.yarn;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
 
-import com.typesafe.config.Config;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableSet;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.util.logs.LogCopier;
 
@@ -42,6 +45,7 @@ import org.apache.gobblin.util.logs.LogCopier;
  * @author Yinan Li
  */
 class GobblinYarnLogSource {
+  private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
 
   /**
    * Return if the log source is present or not.
@@ -67,7 +71,7 @@ class GobblinYarnLogSource {
     LogCopier.Builder builder = LogCopier.newBuilder()
             .useSrcFileSystem(FileSystem.getLocal(new Configuration()))
             .useDestFileSystem(destFs)
-            .readFrom(getLocalLogDir())
+            .readFrom(getLocalLogDirs())
             .writeTo(getHdfsLogDir(containerId, destFs, appWorkDir))
             .acceptsLogFileExtensions(ImmutableSet.of(ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
             .useLogFileNamePrefix(containerId.toString());
@@ -80,8 +84,14 @@ class GobblinYarnLogSource {
     return builder.build();
   }
 
-  private Path getLocalLogDir() throws IOException {
-    return new Path(System.getenv(ApplicationConstants.Environment.LOG_DIRS.toString()));
+  /**
+   * Multiple directories may be specified in the LOG_DIRS string. Split them up and return a list of {@link Path}s.
+   * @return list of {@link Path}s to the log directories
+   * @throws IOException
+   */
+  private List<Path> getLocalLogDirs() throws IOException {
+    String logDirs = System.getenv(ApplicationConstants.Environment.LOG_DIRS.toString());
+    return COMMA_SPLITTER.splitToList(logDirs).stream().map(e -> new Path(e)).collect(Collectors.toList());
   }
 
   private Path getHdfsLogDir(ContainerId containerId, FileSystem destFs, Path appWorkDir) throws IOException {