You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/04 20:22:24 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-723] Add
support to the LogCopier for copying from multiple source paths
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dab5b8a [GOBBLIN-723] Add support to the LogCopier for copying from multiple source paths
dab5b8a is described below
commit dab5b8a04209669ac8da728ae1f32bdf24154199
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Thu Apr 4 13:22:18 2019 -0700
[GOBBLIN-723] Add support to the LogCopier for copying from multiple source paths
Closes #2590 from htran1/yarn_fix_log_copier
---
.../org/apache/gobblin/util/logs/LogCopier.java | 40 ++++++++++++++++------
.../apache/gobblin/yarn/GobblinYarnLogSource.java | 18 +++++++---
2 files changed, 43 insertions(+), 15 deletions(-)
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
index 9e4b957..df626df 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
@@ -22,11 +22,13 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -116,7 +118,7 @@ public class LogCopier extends AbstractScheduledService {
private final FileSystem srcFs;
private final FileSystem destFs;
- private final Path srcLogDir;
+ private final List<Path> srcLogDirs;
private final Path destLogDir;
private final long sourceLogFileMonitorInterval;
@@ -140,7 +142,7 @@ public class LogCopier extends AbstractScheduledService {
this.srcFs = builder.srcFs;
this.destFs = builder.destFs;
- this.srcLogDir = this.srcFs.makeQualified(builder.srcLogDir);
+ this.srcLogDirs = builder.srcLogDirs.stream().map(d -> this.srcFs.makeQualified(d)).collect(Collectors.toList());
this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
this.sourceLogFileMonitorInterval = builder.sourceLogFileMonitorInterval;
@@ -180,15 +182,19 @@ public class LogCopier extends AbstractScheduledService {
* Perform a check on new source log files and submit copy tasks for new log files.
*/
private void checkSrcLogFiles() throws IOException {
- List<FileStatus> srcLogFiles = FileListUtils.listFilesRecursively(this.srcFs, this.srcLogDir, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(path.getName()));
- }
- });
+ List<FileStatus> srcLogFiles = new ArrayList<>();
+
+ for (Path logDirPath: this.srcLogDirs) {
+ srcLogFiles.addAll(FileListUtils.listFilesRecursively(this.srcFs, logDirPath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(path.getName()));
+ }
+ }));
+ }
if (srcLogFiles.isEmpty()) {
- LOGGER.warn("No log file found under directory " + this.srcLogDir);
+ LOGGER.warn("No log file found under directories " + this.srcLogDirs);
return;
}
@@ -238,7 +244,7 @@ public class LogCopier extends AbstractScheduledService {
private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
private FileSystem srcFs;
- private Path srcLogDir;
+ private List<Path> srcLogDirs;
private FileSystem destFs;
private Path destLogDir;
@@ -388,7 +394,19 @@ public class LogCopier extends AbstractScheduledService {
*/
public Builder readFrom(Path srcLogDir) {
Preconditions.checkNotNull(srcLogDir);
- this.srcLogDir = srcLogDir;
+ this.srcLogDirs = ImmutableList.of(srcLogDir);
+ return this;
+ }
+
+ /**
+ * Set the paths of the source log file directories to read from.
+ *
+ * @param srcLogDirs the paths of the source log file directories to read from
+ * @return this {@link LogCopier.Builder} instance
+ */
+ public Builder readFrom(List<Path> srcLogDirs) {
+ Preconditions.checkNotNull(srcLogDirs);
+ this.srcLogDirs = srcLogDirs;
return this;
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
index 7163804..c1c0e61 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
@@ -18,15 +18,18 @@
package org.apache.gobblin.yarn;
import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
-import com.typesafe.config.Config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
+import com.typesafe.config.Config;
import org.apache.gobblin.util.logs.LogCopier;
@@ -42,6 +45,7 @@ import org.apache.gobblin.util.logs.LogCopier;
* @author Yinan Li
*/
class GobblinYarnLogSource {
+ private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
/**
* Return if the log source is present or not.
@@ -67,7 +71,7 @@ class GobblinYarnLogSource {
LogCopier.Builder builder = LogCopier.newBuilder()
.useSrcFileSystem(FileSystem.getLocal(new Configuration()))
.useDestFileSystem(destFs)
- .readFrom(getLocalLogDir())
+ .readFrom(getLocalLogDirs())
.writeTo(getHdfsLogDir(containerId, destFs, appWorkDir))
.acceptsLogFileExtensions(ImmutableSet.of(ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
.useLogFileNamePrefix(containerId.toString());
@@ -80,8 +84,14 @@ class GobblinYarnLogSource {
return builder.build();
}
- private Path getLocalLogDir() throws IOException {
- return new Path(System.getenv(ApplicationConstants.Environment.LOG_DIRS.toString()));
+ /**
+ * Multiple directories may be specified in the LOG_DIRS string. Split them up and return a list of {@link Path}s.
+ * @return list of {@link Path}s to the log directories
+ * @throws IOException
+ */
+ private List<Path> getLocalLogDirs() throws IOException {
+ String logDirs = System.getenv(ApplicationConstants.Environment.LOG_DIRS.toString());
+ return COMMA_SPLITTER.splitToList(logDirs).stream().map(e -> new Path(e)).collect(Collectors.toList());
}
private Path getHdfsLogDir(ContainerId containerId, FileSystem destFs, Path appWorkDir) throws IOException {