You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/21 19:21:08 UTC

[gobblin] branch master updated: [GOBBLIN-1619] WriterUtils.mkdirsWithRecursivePermission contains race condition and puts unnecessary load on filesystem (#3477)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 167d2104c [GOBBLIN-1619] WriterUtils.mkdirsWithRecursivePermission contains race condition and puts unnecessary load on filesystem (#3477)
167d2104c is described below

commit 167d2104cba64cebe44340021d71e383c14e8fa3
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Thu Apr 21 12:21:02 2022 -0700

    [GOBBLIN-1619] WriterUtils.mkdirsWithRecursivePermission contains race condition and puts unnecessary load on filesystem (#3477)
    
    * [GOBBLIN-1619] Fix race cond. in writerutil mkdirs
    
    * writer util mkdirs previously had race condition when multiple processes
    try to create the same parent directory. This causes incorrect
    FileNotFoundException
    * new implementation does not change the behavior
    
    * Test coverage for retry config
    
    * Wait for file to exist via retry cfg before setting perms
---
 .../java/org/apache/gobblin/util/WriterUtils.java  | 53 ++++++++++++++--------
 .../org/apache/gobblin/util/WriterUtilsTest.java   | 38 ++++++++++++++++
 2 files changed, 72 insertions(+), 19 deletions(-)

diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
index b9e64e3e1..fe2c78f39 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
@@ -19,8 +19,12 @@ package org.apache.gobblin.util;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
+import java.util.function.BiConsumer;
+import lombok.SneakyThrows;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.hadoop.conf.Configuration;
@@ -284,34 +288,45 @@ public class WriterUtils {
       return;
     }
 
-    if (path.getParent() != null && !fs.exists(path.getParent())) {
-      mkdirsWithRecursivePermissionWithRetry(fs, path.getParent(), perm, retrierConfig);
+    Set<Path> pathsThatDidntExistBefore = new HashSet<>();
+    for (Path p = path; p != null && !fs.exists(p); p = p.getParent()) {
+      pathsThatDidntExistBefore.add(p);
     }
 
     if (!fs.mkdirs(path, perm)) {
       throw new IOException(String.format("Unable to mkdir %s with permission %s", path, perm));
     }
 
-    if (retrierConfig != NO_RETRY_CONFIG) {
-      //Wait until file is not there as it can happen the file fail to exist right away on eventual consistent fs like Amazon S3
-      Retryer<Void> retryer = RetryerFactory.newInstance(retrierConfig);
+    BiConsumer<FileSystem, Path> waitPolicy = getWaitPolicy(retrierConfig);
+    for (Path p : pathsThatDidntExistBefore) {
+      waitPolicy.accept(fs, path);
+      fs.setPermission(p, perm);
+    }
+  }
 
-      try {
-        retryer.call(() -> {
-          if (!fs.exists(path)) {
-            throw new IOException("Path " + path + " does not exist however it should. Will wait more.");
+  // define behavior for waiting until the file exists before proceeding
+  private static BiConsumer<FileSystem, Path> getWaitPolicy(Config retrierConfig) {
+    return new BiConsumer<FileSystem, Path>() {
+      @SneakyThrows
+      @Override
+      public void accept(FileSystem fs, Path path) {
+        if (retrierConfig != NO_RETRY_CONFIG) {
+          //Wait until file is not there as it can happen the file fail to exist right away on eventual consistent fs like Amazon S3
+          Retryer<Void> retryer = RetryerFactory.newInstance(retrierConfig);
+
+          try {
+            retryer.call(() -> {
+              if (!fs.exists(path)) {
+                throw new IOException("Path " + path + " does not exist however it should. Will wait more.");
+              }
+              return null;
+            });
+          } catch (Exception e) {
+            throw new IOException("Path " + path + "does not exist however it should. Giving up..."+ e);
           }
-          return null;
-        });
-      } catch (Exception e) {
-        throw new IOException("Path " + path + "does not exist however it should. Giving up..."+ e);
+        }
       }
-    }
-
-    // Double check permission, since fs.mkdirs() may not guarantee to set the permission correctly
-    if (!fs.getFileStatus(path).getPermission().equals(perm)) {
-      fs.setPermission(path, perm);
-    }
+    };
   }
 
   public static URI getWriterFsUri(State state, int numBranches, int branchId) {
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
index e14972f4f..5a66177d8 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
@@ -18,6 +18,9 @@
 package org.apache.gobblin.util;
 
 import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
 import org.apache.avro.file.CodecFactory;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -30,6 +33,11 @@ import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.Extract.TableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
 
 /**
  * Tests for {@link WriterUtils}.
@@ -125,6 +133,36 @@ public class WriterUtilsTest {
         ConfigurationKeys.DEFAULT_FORK_BRANCH_NAME + "0"));
   }
 
+  @Test
+  public void testMkdirsPermissionsSet() throws IOException {
+    Path testRoot = new Path("/tmp");
+    Path testParent = new Path(testRoot, "mkdirs-1");
+    Path testChild = new Path(testParent, "mkdirs-2/mkdirs-3/mkdirs-4");
+
+    Config retryConfig = ConfigFactory.empty();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    try {
+      fs.delete(testParent, true);
+
+      FsPermission all = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+      WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs, testParent, all, retryConfig);
+      Assert.assertTrue(fs.exists(testParent));
+      Assert.assertEquals(fs.getFileStatus(testParent).getPermission(), all);
+
+      FsPermission restricted = new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.EXECUTE);
+      WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs, testChild, restricted, retryConfig);
+      Assert.assertTrue(fs.exists(testChild));
+
+      // created parent permission remains unchanged but uncreated parents and new dir set to restricted
+      Assert.assertEquals(fs.getFileStatus(testParent).getPermission(), all);
+      Assert.assertEquals(fs.getFileStatus(testChild.getParent().getParent()).getPermission(), restricted);
+      Assert.assertEquals(fs.getFileStatus(testChild.getParent()).getPermission(), restricted);
+      Assert.assertEquals(fs.getFileStatus(testChild).getPermission(), restricted);
+    } finally {
+      fs.delete(testParent, true);
+    }
+  }
+
   @Test
   public void testGetDefaultWriterFilePathWithWorkUnitState() {
     String namespace = "gobblin.test";