You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/01 20:24:03 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1136] Make
LogCopier be able to refresh FileSystem for long running job use cases
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8f67aac [GOBBLIN-1136] Make LogCopier be able to refresh FileSystem for long running job use cases
8f67aac is described below
commit 8f67aac08e223039c0bf233eb6d2b332ec86bed1
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri May 1 13:23:52 2020 -0700
[GOBBLIN-1136] Make LogCopier be able to refresh FileSystem for long running job use cases
Closes #2975 from ZihanLi58/GOBBLIN-1136
---
.../java/org/apache/gobblin/util/AvroUtils.java | 6 +-
.../util/filesystem/FileSystemSupplier.java | 32 ++---
.../org/apache/gobblin/util/logs/LogCopier.java | 143 ++++++++++++++-------
.../gobblin/yarn/GobblinApplicationMaster.java | 7 +-
.../apache/gobblin/yarn/GobblinYarnLogSource.java | 55 ++++----
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 16 ++-
.../gobblin/yarn/YarnAppMasterSecurityManager.java | 5 +-
.../gobblin/yarn/YarnContainerSecurityManager.java | 31 +++--
8 files changed, 180 insertions(+), 115 deletions(-)
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index bf4dc76..65d3279 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -476,7 +476,7 @@ public class AvroUtils {
}
public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite, FsPermission perm)
- throws IOException {
+ throws IOException {
writeSchemaToFile(schema, filePath, null, fs, overwrite, perm);
}
@@ -914,7 +914,7 @@ public class AvroUtils {
List<Field> newOutputFields = Stream.concat(outputFields.stream(), fieldList.stream()).collect(Collectors.toList());
Schema outputSchema = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(),
- inputSchema.getNamespace(), inputSchema.isError());
+ inputSchema.getNamespace(), inputSchema.isError());
outputSchema.setFields(newOutputFields);
copyProperties(inputSchema, outputSchema);
return outputSchema;
@@ -932,7 +932,7 @@ public class AvroUtils {
* @return an outputRecord that contains a union of the fields in the inputRecord and the field-values in the fieldMap
*/
public static GenericRecord decorateRecord(GenericRecord inputRecord, @Nonnull Map<String, Object> fieldMap,
- Schema outputSchema) {
+ Schema outputSchema) {
GenericRecord outputRecord = new GenericData.Record(outputSchema);
inputRecord.getSchema().getFields().forEach(f -> outputRecord.put(f.name(), inputRecord.get(f.name())));
fieldMap.forEach((key, value) -> outputRecord.put(key, value));
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemSupplier.java
similarity index 50%
copy from gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
copy to gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemSupplier.java
index 7c3be1c..23a9725 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemSupplier.java
@@ -15,32 +15,20 @@
* limitations under the License.
*/
-package org.apache.gobblin.yarn;
+package org.apache.gobblin.util.filesystem;
-import com.google.common.base.Throwables;
-import com.google.common.eventbus.EventBus;
-import com.typesafe.config.Config;
import java.io.IOException;
-import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-public class YarnAppMasterSecurityManager extends YarnContainerSecurityManager{
-
- private YarnService yarnService;
- public YarnAppMasterSecurityManager(Config config, FileSystem fs, EventBus eventBus, YarnService yarnService) {
- super(config, fs, eventBus);
- this.yarnService = yarnService;
- }
+/**
+ * An interface of supplier to get FileSystem
+ */
- @Override
- public void handleTokenFileUpdatedEvent(DelegationTokenUpdatedEvent delegationTokenUpdatedEvent) {
- super.handleTokenFileUpdatedEvent(delegationTokenUpdatedEvent);
- try {
- yarnService.updateToken();
- } catch (IOException ioe) {
- throw Throwables.propagate(ioe);
- }
- }
+public interface FileSystemSupplier {
+ /**
+ * Function to get fileSystem
+ * @return the new FileSystem for using
+ */
+ public FileSystem getFileSystem() throws IOException;
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
index d516c80..fa54b1b 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
@@ -17,6 +17,17 @@
package org.apache.gobblin.util.logs;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractScheduledService;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
@@ -35,7 +46,12 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.DatasetFilterUtils;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.filesystem.FileSystemSupplier;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -44,24 +60,6 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.AbstractScheduledService;
-
-import lombok.Getter;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.DatasetFilterUtils;
-import org.apache.gobblin.util.FileListUtils;
-
/**
* A utility service that periodically reads log files in a source log file directory for changes
@@ -113,13 +111,15 @@ public class LogCopier extends AbstractScheduledService {
private static final int DEFAULT_NUM_COPY_THREADS = 10;
- private final FileSystem srcFs;
- private final FileSystem destFs;
+ private FileSystem srcFs;
+ private FileSystem destFs;
private final List<Path> srcLogDirs;
private final Path destLogDir;
private final long sourceLogFileMonitorInterval;
private final TimeUnit timeUnit;
+ private final FileSystemSupplier destFsSupplier;
+ private final FileSystemSupplier srcFsSupplier;
private final Set<String> logFileExtensions;
private final int numCopyThreads;
@@ -134,14 +134,21 @@ public class LogCopier extends AbstractScheduledService {
private final ExecutorService executorService;
+ @Setter
+ private boolean needToUpdateDestFs;
+ @Setter
+ private boolean needToUpdateSrcFs;
@Getter
private final Set<String> copiedFileNames = Sets.newConcurrentHashSet();
private boolean shouldCopyCurrentLogFile;
- private LogCopier(Builder builder) {
- this.srcFs = builder.srcFs;
- this.destFs = builder.destFs;
-
+ private LogCopier(Builder builder) throws IOException {
+ this.destFsSupplier = builder.destFsSupplier;
+ this.srcFsSupplier = builder.srcFsSupplier;
+ this.srcFs = this.srcFsSupplier != null ? this.srcFsSupplier.getFileSystem() : builder.srcFs;
+ Preconditions.checkArgument(this.srcFs != null, "srcFs or srcFsSupplier has not been set");
+ this.destFs = this.destFsSupplier != null ? this.destFsSupplier.getFileSystem() : builder.destFs;
+ Preconditions.checkArgument(this.destFs != null, "destFs or destFsSupplier has not been set");
this.srcLogDirs = builder.srcLogDirs.stream().map(d -> this.srcFs.makeQualified(d)).collect(Collectors.toList());
this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
@@ -151,6 +158,8 @@ public class LogCopier extends AbstractScheduledService {
this.logFileExtensions = builder.logFileExtensions;
this.currentLogFileName = builder.currentLogFileName;
this.shouldCopyCurrentLogFile = false;
+ this.needToUpdateDestFs = false;
+ this.needToUpdateSrcFs = false;
this.includingRegexPatterns = Optional.fromNullable(builder.includingRegexPatterns);
this.excludingRegexPatterns = Optional.fromNullable(builder.excludingRegexPatterns);
@@ -207,7 +216,6 @@ public class LogCopier extends AbstractScheduledService {
}
return LogCopier.this.logFileExtensions.contains(Files.getFileExtension(logFilePath.getName()));
-
}
/**
@@ -235,11 +243,11 @@ public class LogCopier extends AbstractScheduledService {
void checkSrcLogFiles() throws IOException {
List<FileStatus> srcLogFiles = new ArrayList<>();
Set<String> srcLogFileNames = new HashSet<>();
- Set <Path> newLogFiles = new HashSet<>();
- for (Path logDirPath: srcLogDirs) {
+ Set<Path> newLogFiles = new HashSet<>();
+ for (Path logDirPath : srcLogDirs) {
srcLogFiles.addAll(FileListUtils.listFilesRecursively(srcFs, logDirPath));
//Remove the already copied files from the list of files to copy
- for (FileStatus srcLogFile: srcLogFiles) {
+ for (FileStatus srcLogFile : srcLogFiles) {
if (shouldIncludeLogFile(srcLogFile)) {
newLogFiles.add(srcLogFile.getPath());
}
@@ -255,14 +263,15 @@ public class LogCopier extends AbstractScheduledService {
List<Future> futures = new ArrayList<>();
// Schedule a copy task for each new log file
for (final Path srcLogFile : newLogFiles) {
- String destLogFileName = this.logFileNamePrefix.isPresent()
- ? this.logFileNamePrefix.get() + "." + srcLogFile.getName() : srcLogFile.getName();
+ String destLogFileName =
+ this.logFileNamePrefix.isPresent() ? this.logFileNamePrefix.get() + "." + srcLogFile.getName()
+ : srcLogFile.getName();
final Path destLogFile = new Path(this.destLogDir, destLogFileName);
futures.add(this.executorService.submit(new LogCopyTask(srcLogFile, destLogFile)));
}
//Wait for copy tasks to finish
- for (Future future: futures) {
+ for (Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
@@ -271,7 +280,24 @@ public class LogCopier extends AbstractScheduledService {
LOGGER.error("Failed LogCopyTask - {}", e);
}
}
-
+ if (needToUpdateDestFs) {
+ if (destFsSupplier == null) {
+ throw new IOException("Try to update dest fileSystem but destFsSupplier has not been set");
+ }
+ this.destFs.close();
+ this.destFs = destFsSupplier.getFileSystem();
+ LOGGER.info("Dest fs updated" + destFs.toString());
+ needToUpdateDestFs = false;
+ }
+ if (needToUpdateSrcFs) {
+ if (srcFsSupplier == null) {
+ throw new IOException("Try to update source fileSystem but srcFsSupplier has not been set");
+ }
+ this.srcFs.close();
+ this.srcFs = srcFsSupplier.getFileSystem();
+ LOGGER.info("Src fs updated" + srcFs.toString());
+ needToUpdateSrcFs = false;
+ }
pruneCopiedFileNames(srcLogFileNames);
}
@@ -291,10 +317,12 @@ public class LogCopier extends AbstractScheduledService {
private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
- private FileSystem srcFs;
+ private FileSystem srcFs = null;
private List<Path> srcLogDirs;
- private FileSystem destFs;
+ private FileSystem destFs = null;
private Path destLogDir;
+ private FileSystemSupplier destFsSupplier = null;
+ private FileSystemSupplier srcFsSupplier = null;
private long sourceLogFileMonitorInterval = DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL;
@@ -313,11 +341,11 @@ public class LogCopier extends AbstractScheduledService {
private int linesWrittenBeforeFlush = DEFAULT_LINES_WRITTEN_BEFORE_FLUSH;
/**
- * Set the interval between two checks for the source log file monitor.
- *
- * @param sourceLogFileMonitorInterval the interval between two checks for the source log file monitor
- * @return this {@link LogCopier.Builder} instance
- */
+ * Set the interval between two checks for the source log file monitor.
+ *
+ * @param sourceLogFileMonitorInterval the interval between two checks for the source log file monitor
+ * @return this {@link LogCopier.Builder} instance
+ */
public Builder useSourceLogFileMonitorInterval(long sourceLogFileMonitorInterval) {
Preconditions.checkArgument(sourceLogFileMonitorInterval > 0,
"Source log file monitor interval must be positive");
@@ -338,6 +366,30 @@ public class LogCopier extends AbstractScheduledService {
}
/**
+ * Set the {@link FileSystemSupplier} used for generating new Dest FileSystem later when token been updated.
+ *
+ * @param supplier the {@link FileSystemSupplier} used for generating new Dest FileSystem
+ * @return this {@link LogCopier.Builder} instance
+ */
+ public Builder useDestFsSupplier(FileSystemSupplier supplier) {
+ Preconditions.checkNotNull(supplier);
+ this.destFsSupplier = supplier;
+ return this;
+ }
+
+ /**
+ * Set the {@link FileSystemSupplier} used for generating new source FileSystem later when token been updated.
+ *
+ * @param supplier the {@link FileSystemSupplier} used for generating new source FileSystem
+ * @return this {@link LogCopier.Builder} instance
+ */
+ public Builder useSrcFsSupplier(FileSystemSupplier supplier) {
+ Preconditions.checkNotNull(supplier);
+ this.srcFsSupplier = supplier;
+ return this;
+ }
+
+ /**
* Set the set of acceptable log file extensions.
*
* @param logFileExtensions the set of acceptable log file extensions
@@ -481,7 +533,7 @@ public class LogCopier extends AbstractScheduledService {
*
* @return a new {@link LogCopier} instance
*/
- public LogCopier build() {
+ public LogCopier build() throws IOException {
return new LogCopier(this);
}
}
@@ -565,10 +617,11 @@ public class LogCopier extends AbstractScheduledService {
* </p>
*/
private boolean shouldCopyLine(String line) {
- boolean including = !LogCopier.this.includingRegexPatterns.isPresent()
- || DatasetFilterUtils.stringInPatterns(line, LogCopier.this.includingRegexPatterns.get());
- boolean excluding = LogCopier.this.excludingRegexPatterns.isPresent()
- && DatasetFilterUtils.stringInPatterns(line, LogCopier.this.excludingRegexPatterns.get());
+ boolean including =
+ !LogCopier.this.includingRegexPatterns.isPresent() || DatasetFilterUtils.stringInPatterns(line,
+ LogCopier.this.includingRegexPatterns.get());
+ boolean excluding = LogCopier.this.excludingRegexPatterns.isPresent() && DatasetFilterUtils.stringInPatterns(line,
+ LogCopier.this.excludingRegexPatterns.get());
return !excluding && including;
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index c47032e..c1f7cc0 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.util.logs.LogCopier;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@@ -79,6 +80,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
@Getter
private final YarnService yarnService;
+ private LogCopier logCopier;
public GobblinApplicationMaster(String applicationName, String applicationId, ContainerId containerId, Config config,
YarnConfiguration yarnConfiguration) throws Exception {
@@ -91,8 +93,9 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
if (gobblinYarnLogSource.isLogSourcePresent()) {
Path appWorkDir = PathUtils.combinePaths(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.clusterName, this.applicationId), "AppMaster");
+ logCopier = gobblinYarnLogSource.buildLogCopier(this.config, containerId.toString(), this.fs, appWorkDir);
this.applicationLauncher
- .addService(gobblinYarnLogSource.buildLogCopier(this.config, containerId.toString(), this.fs, appWorkDir));
+ .addService(logCopier);
}
this.yarnService = buildYarnService(this.config, applicationName, this.applicationId, yarnConfiguration, this.fs);
@@ -127,7 +130,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
* Build the {@link YarnAppMasterSecurityManager} for the Application Master.
*/
private YarnContainerSecurityManager buildYarnContainerSecurityManager(Config config, FileSystem fs) {
- return new YarnAppMasterSecurityManager(config, fs, this.eventBus, this.yarnService);
+ return new YarnAppMasterSecurityManager(config, fs, this.eventBus, this.logCopier, this.yarnService);
}
@Override
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
index 64127ac..49a2d12 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
@@ -17,26 +17,24 @@
package org.apache.gobblin.yarn;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.stream.Collectors;
-
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filesystem.FileSystemSupplier;
+import org.apache.gobblin.util.logs.LogCopier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.logs.LogCopier;
-
/**
* A base class for container processes that are sources of Gobblin Yarn application logs.
@@ -74,15 +72,27 @@ class GobblinYarnLogSource {
protected LogCopier buildLogCopier(Config config, String containerId, FileSystem destFs, Path appWorkDir)
throws IOException {
LogCopier.Builder builder = LogCopier.newBuilder()
- .useSrcFileSystem(buildFileSystem(config, true))
- .useDestFileSystem(buildFileSystem(config, false))
- .readFrom(getLocalLogDirs())
- .writeTo(getHdfsLogDir(containerId, destFs, appWorkDir))
- .useCurrentLogFileName(Files.getNameWithoutExtension(System.getProperty(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME)));
+ .useDestFsSupplier(new FileSystemSupplier() {
+ @Override
+ public FileSystem getFileSystem() throws IOException {
+ return buildFileSystem(config, false);
+ }
+ })
+ .useSrcFsSupplier(new FileSystemSupplier() {
+ @Override
+ public FileSystem getFileSystem() throws IOException {
+ return buildFileSystem(config, true);
+ }
+ })
+ .readFrom(getLocalLogDirs())
+ .writeTo(getHdfsLogDir(containerId, destFs, appWorkDir))
+ .useCurrentLogFileName(Files.getNameWithoutExtension(
+ System.getProperty(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME)));
- builder.acceptsLogFileExtensions(config.hasPath(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS) ? ImmutableSet
- .copyOf(Splitter.on(",").splitToList(config.getString(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS)))
- : ImmutableSet.of());
+ builder.acceptsLogFileExtensions(
+ config.hasPath(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS) ? ImmutableSet.copyOf(
+ Splitter.on(",").splitToList(config.getString(GobblinYarnConfigurationKeys.LOG_FILE_EXTENSIONS)))
+ : ImmutableSet.of());
return builder.build();
}
@@ -92,10 +102,10 @@ class GobblinYarnLogSource {
* returned by the method has automatic closing disabled. The user of the instance needs to handle closing of the
* instance, typically as part of its shutdown sequence.
*/
- private FileSystem buildFileSystem(Config config, boolean isLocal) throws IOException {
+ public static FileSystem buildFileSystem(Config config, boolean isLocal) throws IOException {
return isLocal ? FileSystem.newInstanceLocal(AUTO_CLOSE_CONFIG)
- : config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
- .newInstance(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), AUTO_CLOSE_CONFIG)
+ : config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem.newInstance(
+ URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), AUTO_CLOSE_CONFIG)
: FileSystem.newInstance(AUTO_CLOSE_CONFIG);
}
@@ -110,7 +120,8 @@ class GobblinYarnLogSource {
}
private Path getHdfsLogDir(String containerId, FileSystem destFs, Path appWorkDir) throws IOException {
- Path logRootDir = PathUtils.combinePaths(appWorkDir.toString(), GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME, containerId);
+ Path logRootDir =
+ PathUtils.combinePaths(appWorkDir.toString(), GobblinYarnConfigurationKeys.APP_LOGS_DIR_NAME, containerId);
if (!destFs.exists(logRootDir)) {
destFs.mkdirs(logRootDir);
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 8e88b2e..f79a5bb 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.util.logs.LogCopier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -70,24 +71,25 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
public List<Service> getServices() {
List<Service> services = new ArrayList<>();
services.addAll(super.getServices());
- if (UserGroupInformation.isSecurityEnabled()) {
- LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
- services.add(new YarnContainerSecurityManager(this.clusterConfig, this.fs, this.eventBus));
- }
-
+ LogCopier logCopier = null;
if (clusterConfig.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
String containerLogDir = clusterConfig.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
if (gobblinYarnLogSource.isLogSourcePresent()) {
try {
- services.add(gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
- new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId))));
+ logCopier = gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
+ new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId)));
+ services.add(logCopier);
} catch (Exception e) {
LOGGER.warn("Cannot add LogCopier service to the service manager due to", e);
}
}
}
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
+ services.add(new YarnContainerSecurityManager(this.clusterConfig, this.fs, this.eventBus, logCopier));
+ }
return services;
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
index 7c3be1c..ba6754a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppMasterSecurityManager.java
@@ -21,6 +21,7 @@ import com.google.common.base.Throwables;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import java.io.IOException;
+import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -29,8 +30,8 @@ import org.apache.hadoop.fs.Path;
public class YarnAppMasterSecurityManager extends YarnContainerSecurityManager{
private YarnService yarnService;
- public YarnAppMasterSecurityManager(Config config, FileSystem fs, EventBus eventBus, YarnService yarnService) {
- super(config, fs, eventBus);
+ public YarnAppMasterSecurityManager(Config config, FileSystem fs, EventBus eventBus, LogCopier logCopier, YarnService yarnService) {
+ super(config, fs, eventBus, logCopier);
this.yarnService = yarnService;
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
index 060da6a..f98f7d9 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
@@ -17,8 +17,15 @@
package org.apache.gobblin.yarn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
import java.io.IOException;
-
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
@@ -28,16 +35,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.typesafe.config.Config;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
-
/**
* A class for managing token renewing in the containers including the container for the
@@ -58,13 +55,19 @@ public class YarnContainerSecurityManager extends AbstractIdleService {
private final FileSystem fs;
private final Path tokenFilePath;
private final EventBus eventBus;
+ private final LogCopier logCopier;
public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus eventBus) {
+ this(config, fs, eventBus, null);
+ }
+
+ public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus eventBus, LogCopier logCopier) {
this.fs = fs;
this.tokenFilePath = new Path(this.fs.getHomeDirectory(),
config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY) + Path.SEPARATOR
+ GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
this.eventBus = eventBus;
+ this.logCopier = logCopier;
}
@SuppressWarnings("unused")
@@ -72,6 +75,10 @@ public class YarnContainerSecurityManager extends AbstractIdleService {
public void handleTokenFileUpdatedEvent(DelegationTokenUpdatedEvent delegationTokenUpdatedEvent) {
try {
addCredentials(readCredentials(this.tokenFilePath));
+ if (this.logCopier != null) {
+ this.logCopier.setNeedToUpdateDestFs(true);
+ this.logCopier.setNeedToUpdateSrcFs(true);
+ }
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
}
@@ -99,7 +106,7 @@ public class YarnContainerSecurityManager extends AbstractIdleService {
@VisibleForTesting
void addCredentials(Credentials credentials) throws IOException {
for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
- LOGGER.info("updating "+token.toString());
+ LOGGER.info("updating " + token.toString());
}
UserGroupInformation.getCurrentUser().addCredentials(credentials);
}