You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/03/07 19:49:38 UTC
[gobblin] branch master updated: [GOBBLIN-1617] pass configurations to some HadoopUtils APIs (#3475)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8ffe72b [GOBBLIN-1617] pass configurations to some HadoopUtils APIs (#3475)
8ffe72b is described below
commit 8ffe72bcb9911710ba0fb9a345c605692f270493
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Mar 7 11:49:29 2022 -0800
[GOBBLIN-1617] pass configurations to some HadoopUtils APIs (#3475)
* pass configurations to some HadoopUtils APIs
* address review comments
---
.../org/apache/gobblin/writer/FsDataWriter.java | 5 ++--
.../java/org/apache/gobblin/util/HadoopUtils.java | 28 ++++++++++++++++++++--
2 files changed, 29 insertions(+), 4 deletions(-)
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index 5b6d54a..7ed2d1e 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -64,6 +64,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
public static final String FS_WRITER_METRICS_KEY = "fs_writer_metrics";
protected final State properties;
+ protected final Configuration conf;
protected final String id;
protected final int numBranches;
protected final int branchId;
@@ -96,7 +97,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
this.writerAttemptIdOptional = Optional.fromNullable(builder.getWriterAttemptId());
this.encoders = builder.getEncoders();
- Configuration conf = new Configuration();
+ this.conf = new Configuration();
// Add all job configuration properties so they are picked up by Hadoop
JobConfigurationUtils.putStateIntoConfiguration(properties, conf);
this.fs = WriterUtils.getWriterFS(properties, this.numBranches, this.branchId);
@@ -266,7 +267,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
LOG.info(String.format("Moving data from %s to %s", this.stagingFile, this.outputFile));
// For the same reason as deleting the staging file if it already exists, overwrite
// the output file if it already exists to prevent task retry from being blocked.
- HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile, true);
+ HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile, true, this.conf);
this.properties.appendToSetProp(this.allOutputFilesPropName, this.outputFile.toString());
FsWriterMetrics metrics = new FsWriterMetrics(
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
index ee34292..0a08461 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
@@ -210,9 +210,21 @@ public class HadoopUtils {
* @throws IOException
*/
public static void moveToTrash(FileSystem fs, Path path) throws IOException {
- Trash trash = new Trash(fs, new Configuration());
+ moveToTrash(fs, path, new Configuration());
+ }
+
+ /**
+ * Moves the object to the filesystem trash according to the file system policy.
+ * @param fs FileSystem object
+ * @param path Path to the object to be moved to trash.
+ * @param conf Configurations
+ * @throws IOException
+ */
+ public static void moveToTrash(FileSystem fs, Path path, Configuration conf) throws IOException {
+ Trash trash = new Trash(fs, conf);
trash.moveToTrash(path);
}
+
/**
* Renames a src {@link Path} on fs {@link FileSystem} to a dst {@link Path}. If fs is a {@link LocalFileSystem} and
* src is a directory then {@link File#renameTo} is called directly to avoid a directory rename race condition where
@@ -265,8 +277,20 @@ public class HadoopUtils {
/**
* A wrapper around {@link FileSystem#rename(Path, Path)} which throws {@link IOException} if
* {@link FileSystem#rename(Path, Path)} returns False.
+ * @param fs FileSystem object
+ * @param oldName old name of the path
+ * @param new name of the path
+ * @throws IOException
*/
public static void renamePath(FileSystem fs, Path oldName, Path newName, boolean overwrite) throws IOException {
+ renamePath(fs, oldName, newName, overwrite, new Configuration());
+ }
+
+ /**
+ * A wrapper around {@link FileSystem#rename(Path, Path)} which throws {@link IOException} if
+ * {@link FileSystem#rename(Path, Path)} returns False.
+ */
+ public static void renamePath(FileSystem fs, Path oldName, Path newName, boolean overwrite, Configuration conf) throws IOException {
//In default implementation of rename with rewrite option in FileSystem, if the parent dir of dst does not exist, it will throw exception,
//Which will fail some of our job unintentionally. So we only call that method when fs is an instance of DistributedFileSystem to avoid inconsistency problem
if(fs instanceof DistributedFileSystem) {
@@ -278,7 +302,7 @@ public class HadoopUtils {
}
if (fs.exists(newName)) {
if (overwrite) {
- HadoopUtils.moveToTrash(fs, newName);
+ HadoopUtils.moveToTrash(fs, newName, conf);
} else {
throw new FileAlreadyExistsException(String.format("Failed to rename %s to %s: dst already exists", oldName, newName));
}