You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/03/07 19:49:38 UTC

[gobblin] branch master updated: [GOBBLIN-1617] pass configurations to some HadoopUtils APIs (#3475)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ffe72b  [GOBBLIN-1617] pass configurations to some HadoopUtils APIs (#3475)
8ffe72b is described below

commit 8ffe72bcb9911710ba0fb9a345c605692f270493
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Mar 7 11:49:29 2022 -0800

    [GOBBLIN-1617] pass configurations to some HadoopUtils APIs (#3475)
    
    * pass configurations to some HadoopUtils APIs
    
    * address review comments
---
 .../org/apache/gobblin/writer/FsDataWriter.java    |  5 ++--
 .../java/org/apache/gobblin/util/HadoopUtils.java  | 28 ++++++++++++++++++++--
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index 5b6d54a..7ed2d1e 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -64,6 +64,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
   public static final String FS_WRITER_METRICS_KEY = "fs_writer_metrics";
 
   protected final State properties;
+  protected final Configuration conf;
   protected final String id;
   protected final int numBranches;
   protected final int branchId;
@@ -96,7 +97,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
     this.writerAttemptIdOptional = Optional.fromNullable(builder.getWriterAttemptId());
     this.encoders = builder.getEncoders();
 
-    Configuration conf = new Configuration();
+    this.conf = new Configuration();
     // Add all job configuration properties so they are picked up by Hadoop
     JobConfigurationUtils.putStateIntoConfiguration(properties, conf);
     this.fs = WriterUtils.getWriterFS(properties, this.numBranches, this.branchId);
@@ -266,7 +267,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
     LOG.info(String.format("Moving data from %s to %s", this.stagingFile, this.outputFile));
     // For the same reason as deleting the staging file if it already exists, overwrite
     // the output file if it already exists to prevent task retry from being blocked.
-    HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile, true);
+    HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile, true, this.conf);
     this.properties.appendToSetProp(this.allOutputFilesPropName, this.outputFile.toString());
 
     FsWriterMetrics metrics = new FsWriterMetrics(
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
index ee34292..0a08461 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
@@ -210,9 +210,21 @@ public class HadoopUtils {
    * @throws IOException
    */
   public static void moveToTrash(FileSystem fs, Path path) throws IOException {
-    Trash trash = new Trash(fs, new Configuration());
+    moveToTrash(fs, path, new Configuration());
+  }
+
+  /**
+   * Moves the object to the filesystem trash according to the file system policy.
+   * @param fs FileSystem object
+   * @param path Path to the object to be moved to trash.
+   * @param conf Configurations
+   * @throws IOException
+   */
+  public static void moveToTrash(FileSystem fs, Path path, Configuration conf) throws IOException {
+    Trash trash = new Trash(fs, conf);
     trash.moveToTrash(path);
   }
+
   /**
    * Renames a src {@link Path} on fs {@link FileSystem} to a dst {@link Path}. If fs is a {@link LocalFileSystem} and
    * src is a directory then {@link File#renameTo} is called directly to avoid a directory rename race condition where
@@ -265,8 +277,20 @@ public class HadoopUtils {
   /**
    * A wrapper around {@link FileSystem#rename(Path, Path)} which throws {@link IOException} if
    * {@link FileSystem#rename(Path, Path)} returns False.
+   * @param fs FileSystem object
+   * @param oldName old name of the path
+   * @param new name of the path
+   * @throws IOException
    */
   public static void renamePath(FileSystem fs, Path oldName, Path newName, boolean overwrite) throws IOException {
+    renamePath(fs, oldName, newName, overwrite, new Configuration());
+  }
+
+  /**
+   * A wrapper around {@link FileSystem#rename(Path, Path)} which throws {@link IOException} if
+   * {@link FileSystem#rename(Path, Path)} returns False.
+   */
+  public static void renamePath(FileSystem fs, Path oldName, Path newName, boolean overwrite, Configuration conf) throws IOException {
     //In default implementation of rename with rewrite option in FileSystem, if the parent dir of dst does not exist, it will throw exception,
     //Which will fail some of our job unintentionally. So we only call that method when fs is an instance of DistributedFileSystem to avoid inconsistency problem
     if(fs instanceof DistributedFileSystem) {
@@ -278,7 +302,7 @@ public class HadoopUtils {
       }
       if (fs.exists(newName)) {
         if (overwrite) {
-          HadoopUtils.moveToTrash(fs, newName);
+          HadoopUtils.moveToTrash(fs, newName, conf);
         } else {
           throw new FileAlreadyExistsException(String.format("Failed to rename %s to %s: dst already exists", oldName, newName));
         }