You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 10:03:17 UTC

flink git commit: [FLINK-9707] Support concurrent directory creations in LocalFileSystem

Repository: flink
Updated Branches:
  refs/heads/master 82e6f7377 -> a3bc2bdc6


[FLINK-9707] Support concurrent directory creations in LocalFileSystem

Support concurrent directory creations by accepting directories which have been
created by a different thread/process in LocalFileSystem#mkdirs.

This closes #6243.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3bc2bdc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3bc2bdc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3bc2bdc

Branch: refs/heads/master
Commit: a3bc2bdc6fcf73fb4e9d3b72aeb5226bc9042658
Parents: 82e6f73
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 3 13:36:33 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 12:02:06 2018 +0200

----------------------------------------------------------------------
 .../flink/core/fs/local/LocalFileSystem.java    |  2 +-
 .../core/fs/local/LocalFileSystemTest.java      | 70 +++++++++++++++++++-
 2 files changed, 69 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3bc2bdc/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 2d7fbd5..9ebec6e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -254,7 +254,7 @@ public class LocalFileSystem extends FileSystem {
 		}
 		else {
 			File parent = file.getParentFile();
-			return (parent == null || mkdirsInternal(parent)) && file.mkdir();
+			return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3bc2bdc/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index 2352404..12f1476 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -25,7 +25,9 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assume;
 import org.junit.Rule;
@@ -36,19 +38,29 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.UUID;
-
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
  * This class tests the functionality of the {@link LocalFileSystem} class in its components. In particular,
  * file/directory access, creation, deletion, read, write is tested.
  */
-public class LocalFileSystemTest {
+public class LocalFileSystemTest extends TestLogger {
 
 	@Rule
 	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -319,4 +331,58 @@ public class LocalFileSystemTest {
 		final FileSystem fs = FileSystem.getLocalFileSystem();
 		assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
 	}
+
+	@Test
+	public void testConcurrentMkdirs() throws Exception {
+		final FileSystem fs = FileSystem.getLocalFileSystem();
+		final File root = temporaryFolder.getRoot();
+		final int directoryDepth = 10;
+		final int concurrentOperations = 10;
+
+		final Collection<File> targetDirectories = createTargetDirectories(root, directoryDepth, concurrentOperations);
+
+		final ExecutorService executor = Executors.newFixedThreadPool(concurrentOperations);
+		final CyclicBarrier cyclicBarrier = new CyclicBarrier(concurrentOperations);
+
+		try {
+			final Collection<CompletableFuture<Void>> mkdirsFutures = new ArrayList<>(concurrentOperations);
+			for (File targetDirectory : targetDirectories) {
+				final CompletableFuture<Void> mkdirsFuture = CompletableFuture.runAsync(
+					() -> {
+						try {
+							cyclicBarrier.await();
+							assertThat(fs.mkdirs(Path.fromLocalFile(targetDirectory)), is(true));
+						} catch (Exception e) {
+							throw new CompletionException(e);
+						}
+					}, executor);
+
+				mkdirsFutures.add(mkdirsFuture);
+			}
+
+			final CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+				mkdirsFutures.toArray(new CompletableFuture[concurrentOperations]));
+
+			allFutures.get();
+		} finally {
+			final long timeout = 10000L;
+			ExecutorUtils.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, executor);
+		}
+	}
+
+	private Collection<File> createTargetDirectories(File root, int directoryDepth, int numberDirectories) {
+		final StringBuilder stringBuilder = new StringBuilder();
+
+		for (int i = 0; i < directoryDepth; i++) {
+			stringBuilder.append('/').append(i);
+		}
+
+		final Collection<File> targetDirectories = new ArrayList<>(numberDirectories);
+
+		for (int i = 0; i < numberDirectories; i++) {
+			targetDirectories.add(new File(root, stringBuilder.toString() + '/' + i));
+		}
+
+		return targetDirectories;
+	}
 }