You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/21 19:21:08 UTC
[gobblin] branch master updated: [GOBBLIN-1619] WriterUtils.mkdirsWithRecursivePermission contains race condition and puts unnecessary load on filesystem (#3477)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 167d2104c [GOBBLIN-1619] WriterUtils.mkdirsWithRecursivePermission contains race condition and puts unnecessary load on filesystem (#3477)
167d2104c is described below
commit 167d2104cba64cebe44340021d71e383c14e8fa3
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Thu Apr 21 12:21:02 2022 -0700
[GOBBLIN-1619] WriterUtils.mkdirsWithRecursivePermission contains race condition and puts unnecessary load on filesystem (#3477)
* [GOBBLIN-1619] Fix race cond. in writerutil mkdirs
* writer util mkdirs previously had race condition when multiple processes
try to create the same parent directory. This causes incorrect
FileNotFoundException
* new implementation does not change the behavior
* Test coverage for retry config
* Wait for file to exist via retry cfg before setting perms
---
.../java/org/apache/gobblin/util/WriterUtils.java | 53 ++++++++++++++--------
.../org/apache/gobblin/util/WriterUtilsTest.java | 38 ++++++++++++++++
2 files changed, 72 insertions(+), 19 deletions(-)
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
index b9e64e3e1..fe2c78f39 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
@@ -19,8 +19,12 @@ package org.apache.gobblin.util;
import java.io.IOException;
import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import lombok.SneakyThrows;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.hadoop.conf.Configuration;
@@ -284,34 +288,45 @@ public class WriterUtils {
return;
}
- if (path.getParent() != null && !fs.exists(path.getParent())) {
- mkdirsWithRecursivePermissionWithRetry(fs, path.getParent(), perm, retrierConfig);
+ Set<Path> pathsThatDidntExistBefore = new HashSet<>();
+ for (Path p = path; p != null && !fs.exists(p); p = p.getParent()) {
+ pathsThatDidntExistBefore.add(p);
}
if (!fs.mkdirs(path, perm)) {
throw new IOException(String.format("Unable to mkdir %s with permission %s", path, perm));
}
- if (retrierConfig != NO_RETRY_CONFIG) {
- //Wait until file is not there as it can happen the file fail to exist right away on eventual consistent fs like Amazon S3
- Retryer<Void> retryer = RetryerFactory.newInstance(retrierConfig);
+ BiConsumer<FileSystem, Path> waitPolicy = getWaitPolicy(retrierConfig);
+ for (Path p : pathsThatDidntExistBefore) {
+ waitPolicy.accept(fs, path);
+ fs.setPermission(p, perm);
+ }
+ }
- try {
- retryer.call(() -> {
- if (!fs.exists(path)) {
- throw new IOException("Path " + path + " does not exist however it should. Will wait more.");
+ // define behavior for waiting until the file exists before proceeding
+ private static BiConsumer<FileSystem, Path> getWaitPolicy(Config retrierConfig) {
+ return new BiConsumer<FileSystem, Path>() {
+ @SneakyThrows
+ @Override
+ public void accept(FileSystem fs, Path path) {
+ if (retrierConfig != NO_RETRY_CONFIG) {
+ //Wait until file is not there as it can happen the file fail to exist right away on eventual consistent fs like Amazon S3
+ Retryer<Void> retryer = RetryerFactory.newInstance(retrierConfig);
+
+ try {
+ retryer.call(() -> {
+ if (!fs.exists(path)) {
+ throw new IOException("Path " + path + " does not exist however it should. Will wait more.");
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ throw new IOException("Path " + path + "does not exist however it should. Giving up..."+ e);
}
- return null;
- });
- } catch (Exception e) {
- throw new IOException("Path " + path + "does not exist however it should. Giving up..."+ e);
+ }
}
- }
-
- // Double check permission, since fs.mkdirs() may not guarantee to set the permission correctly
- if (!fs.getFileStatus(path).getPermission().equals(perm)) {
- fs.setPermission(path, perm);
- }
+ };
}
public static URI getWriterFsUri(State state, int numBranches, int branchId) {
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
index e14972f4f..5a66177d8 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java
@@ -18,6 +18,9 @@
package org.apache.gobblin.util;
import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
import org.apache.avro.file.CodecFactory;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -30,6 +33,11 @@ import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.Extract.TableType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
/**
* Tests for {@link WriterUtils}.
@@ -125,6 +133,36 @@ public class WriterUtilsTest {
ConfigurationKeys.DEFAULT_FORK_BRANCH_NAME + "0"));
}
+ @Test
+ public void testMkdirsPermissionsSet() throws IOException {
+ Path testRoot = new Path("/tmp");
+ Path testParent = new Path(testRoot, "mkdirs-1");
+ Path testChild = new Path(testParent, "mkdirs-2/mkdirs-3/mkdirs-4");
+
+ Config retryConfig = ConfigFactory.empty();
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ try {
+ fs.delete(testParent, true);
+
+ FsPermission all = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+ WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs, testParent, all, retryConfig);
+ Assert.assertTrue(fs.exists(testParent));
+ Assert.assertEquals(fs.getFileStatus(testParent).getPermission(), all);
+
+ FsPermission restricted = new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.EXECUTE);
+ WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs, testChild, restricted, retryConfig);
+ Assert.assertTrue(fs.exists(testChild));
+
+ // created parent permission remains unchanged but uncreated parents and new dir set to restricted
+ Assert.assertEquals(fs.getFileStatus(testParent).getPermission(), all);
+ Assert.assertEquals(fs.getFileStatus(testChild.getParent().getParent()).getPermission(), restricted);
+ Assert.assertEquals(fs.getFileStatus(testChild.getParent()).getPermission(), restricted);
+ Assert.assertEquals(fs.getFileStatus(testChild).getPermission(), restricted);
+ } finally {
+ fs.delete(testParent, true);
+ }
+ }
+
@Test
public void testGetDefaultWriterFilePathWithWorkUnitState() {
String namespace = "gobblin.test";