You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:30:10 UTC

[flink] branch release-1.6 updated (e564dd2 -> a58fa98)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e564dd2  [FLINK-10363] [s3] Only log config keys in config loader, to avoid exposing secret values
     new 2b7ea9e  [FLINK-9061] [core] Introduce WriteOptions to Flink FileSystem abstraction
     new cf09494  [FLINK-9061] [state] Creation of state payload files passes optional entropy option to File System
     new a58fa98  [FLINK-9061] [s3 presto] Add entropy injection to S3 file system

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/core/fs/FileSystem.java  |  44 ++-
 .../org/apache/flink/core/fs/WriteOptions.java     | 144 ++++++++++
 .../java/org/apache/flink/util/StringUtils.java    |  32 +++
 .../org/apache/flink/util/StringUtilsTest.java     |  14 +
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    |  25 ++
 .../flink/fs/s3presto/S3FileSystemFactory.java     | 100 ++++++-
 .../flink/fs/s3presto/S3PrestoFileSystem.java      | 301 +++++++++++++++++++++
 .../fs/s3presto/PrestoS3FileSystemEntropyTest.java | 133 +++++++++
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |  26 +-
 .../state/filesystem/FsCheckpointStorage.java      |  17 +-
 .../filesystem/FsCheckpointStreamFactory.java      |   7 +-
 .../FsCheckpointStateOutputStreamTest.java         |  12 +-
 .../filesystem/FsStateBackendEntropyTest.java      |  99 +++++++
 13 files changed, 906 insertions(+), 48 deletions(-)
 create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
 create mode 100644 flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
 create mode 100644 flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java


[flink] 01/03: [FLINK-9061] [core] Introduce WriteOptions to Flink FileSystem abstraction

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b7ea9efe51f2049d628cf1c7ecd922d11ee1420
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Aug 23 14:32:14 2018 +0200

    [FLINK-9061] [core] Introduce WriteOptions to Flink FileSystem abstraction
---
 .../java/org/apache/flink/core/fs/FileSystem.java  |  44 +++----
 .../org/apache/flink/core/fs/WriteOptions.java     | 144 +++++++++++++++++++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    |  25 ++++
 3 files changed, 190 insertions(+), 23 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d451109..ba2113a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -576,29 +576,8 @@ public abstract class FileSystem {
 	/**
 	 * Opens an FSDataOutputStream at the indicated Path.
 	 *
-	 * <p>This method is deprecated, because most of its parameters are ignored by most file systems.
-	 * To control for example the replication factor and block size in the Hadoop Distributed File system,
-	 * make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
-	 * or in the classpath of either Flink or the user code.
-	 *
-	 * @param f
-	 *        the file name to open
-	 * @param overwrite
-	 *        if a file with this name already exists, then if true,
-	 *        the file will be overwritten, and if false an error will be thrown.
-	 * @param bufferSize
-	 *        the size of the buffer to be used.
-	 * @param replication
-	 *        required block replication for the file.
-	 * @param blockSize
-	 *        the size of the file blocks
-	 *
-	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
-	 *                     a file already exists at that path and the write mode indicates to not
-	 *                     overwrite the file.
-	 *
-	 * @deprecated Deprecated because not well supported across types of file systems.
-	 *             Control the behavior of specific file systems via configurations instead.
+	 * @deprecated Deprecated in favor of {@link #create(Path, WriteOptions)} which offers better extensibility
+	 *             to options that are supported only by some filesystems implementations.
 	 */
 	@Deprecated
 	public FSDataOutputStream create(
@@ -649,6 +628,25 @@ public abstract class FileSystem {
 	public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
 
 	/**
+	 * Creates a new file at the given path and opens an FSDataOutputStream to that new file.
+	 *
+	 * <p>This method takes various options, some of which are not supported by all file systems
+	 * (such as controlling block size).
+	 *
+	 * <p>Implementation note: This method should be abstract, but is currently not in order to not break
+	 * backwards compatibility of this class with earlier Flink versions.
+	 *
+	 * @param f The path for the new file.
+	 * @param options The options to parametrize the file and stream creation.
+	 * @return The stream to the new file at the target path.
+	 *
+	 * @throws IOException Thrown if an error occurs while creating the file or opening the stream.
+	 */
+	public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
+		return create(f, options.getOverwrite());
+	}
+
+	/**
 	 * Renames the file/directory src to dst.
 	 *
 	 * @param src
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
new file mode 100644
index 0000000..70f4973
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Write options that can be passed to the methods that write files.
+ */
+@Public
+public class WriteOptions {
+
+	private WriteMode overwrite = WriteMode.NO_OVERWRITE;
+
+	@Nullable
+	private BlockOptions blockSettings;
+
+	private boolean injectEntropy;
+
+	// ------------------------------------------------------------------------
+	//  getters & setters
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the overwrite option.
+	 */
+	public WriteMode getOverwrite() {
+		return overwrite;
+	}
+
+	/**
+	 * Sets the overwrite option.
+	 *
+	 * <p>Method returns this object for fluent function call chaining.
+	 */
+	public WriteOptions setOverwrite(WriteMode overwrite) {
+		this.overwrite = checkNotNull(overwrite);
+		return this;
+	}
+
+	/**
+	 * Gets the block writing settings, like size and replication factor.
+	 * Returns null if no settings are defined.
+	 */
+	@Nullable
+	public BlockOptions getBlockSettings() {
+		return blockSettings;
+	}
+
+	/**
+	 * Sets the block settings, for file systems working with block replication and
+	 * exposing those settings
+	 *
+	 * <p>Method returns this object for fluent function call chaining.
+	 */
+	public WriteOptions setBlockSettings(@Nullable BlockOptions blockSettings) {
+		this.blockSettings = blockSettings;
+		return this;
+	}
+
+	/**
+	 * Gets whether to inject entropy into the path.
+	 */
+	public boolean isInjectEntropy() {
+		return injectEntropy;
+	}
+
+	/**
+	 * Sets whether to inject entropy into the path.
+	 *
+	 * <p>Entropy injection is only supported select filesystems like S3 to overcome
+	 * scalability issues in the sharding. For this option to have any effect, the
+	 * file system must be configured to replace an entropy key with entropy, and the
+	 * path that is written to must contain the entropy key.
+	 *
+	 * <p>Method returns this object for fluent function call chaining.
+	 */
+	public WriteOptions setInjectEntropy(boolean injectEntropy) {
+		this.injectEntropy = injectEntropy;
+		return this;
+	}
+
+	// ------------------------------------------------------------------------
+	//  nested options classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Settings for block replication. Interpreted only by filesystems that are based
+	 * expose block replication settings.
+	 */
+	@Public
+	public static class BlockOptions {
+
+		/** The size of the blocks, in bytes. */
+		private long blockSize;
+
+		/** The number of times the block should be replicated. */
+		private int replicationFactor;
+
+		public BlockOptions(long blockSize, int replicationFactor) {
+			checkArgument(blockSize > 0, "blockSize must be >0");
+			checkArgument(replicationFactor > 0, "replicationFactor must be >=1");
+
+			this.blockSize = blockSize;
+			this.replicationFactor = replicationFactor;
+		}
+
+		/**
+		 * Gets the block size, in bytes.
+		 */
+		public long getBlockSize() {
+			return blockSize;
+		}
+
+		/**
+		 * Gets the number of times the block should be replicated.
+		 */
+		public int getReplicationFactor() {
+			return replicationFactor;
+		}
+	}
+}
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 065ba5a..bceed5e 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.core.fs.WriteOptions.BlockOptions;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.Locale;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -36,6 +40,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class HadoopFileSystem extends FileSystem {
 
+	/** The write buffer size used by default. */
+	public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+
 	/** The wrapped Hadoop File System. */
 	private final org.apache.hadoop.fs.FileSystem fs;
 
@@ -143,6 +150,24 @@ public class HadoopFileSystem extends FileSystem {
 	}
 
 	@Override
+	public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
+		BlockOptions blockSettings = options.getBlockSettings();
+		if (blockSettings == null) {
+			return create(f, options.getOverwrite());
+		}
+		else {
+			checkArgument(blockSettings.getReplicationFactor() <= Short.MAX_VALUE,
+					"block replication factor out of bounds");
+
+			return create(f,
+					options.getOverwrite() == WriteMode.OVERWRITE,
+					DEFAULT_WRITE_BUFFER_SIZE,
+					(short) blockSettings.getReplicationFactor(),
+					blockSettings.getBlockSize());
+		}
+	}
+
+	@Override
 	public boolean delete(final Path f, final boolean recursive) throws IOException {
 		return this.fs.delete(toHadoopPath(f), recursive);
 	}


[flink] 02/03: [FLINK-9061] [state] Creation of state payload files passes optional entropy option to File System

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf09494cbda127bbb88e75ed25d4d342b47b8fc7
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Aug 23 20:43:33 2018 +0200

    [FLINK-9061] [state] Creation of state payload files passes optional entropy option to File System
---
 .../state/filesystem/FsCheckpointStorage.java      | 17 +++-
 .../filesystem/FsCheckpointStreamFactory.java      |  7 +-
 .../FsCheckpointStateOutputStreamTest.java         | 12 ++-
 .../filesystem/FsStateBackendEntropyTest.java      | 99 ++++++++++++++++++++++
 4 files changed, 126 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index af80af7..1549e01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An implementation of durable checkpoint storage to file systems.
@@ -54,11 +55,25 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 			JobID jobId,
 			int fileSizeThreshold) throws IOException {
 
+		this(checkpointBaseDirectory.getFileSystem(),
+				checkpointBaseDirectory,
+				defaultSavepointDirectory,
+				jobId,
+				fileSizeThreshold);
+	}
+
+	public FsCheckpointStorage(
+			FileSystem fs,
+			Path checkpointBaseDirectory,
+			@Nullable Path defaultSavepointDirectory,
+			JobID jobId,
+			int fileSizeThreshold) throws IOException {
+
 		super(jobId, defaultSavepointDirectory);
 
 		checkArgument(fileSizeThreshold >= 0);
 
-		this.fileSystem = checkpointBaseDirectory.getFileSystem();
+		this.fileSystem = checkNotNull(fs);
 		this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
 		this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
 		this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 609ef69..228c5b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.filesystem;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -345,7 +346,11 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			for (int attempt = 0; attempt < 10; attempt++) {
 				try {
 					Path statePath = createStatePath();
-					FSDataOutputStream outStream = fs.create(statePath, FileSystem.WriteMode.NO_OVERWRITE);
+					FSDataOutputStream outStream = fs.create(
+							statePath,
+							new WriteOptions()
+									.setOverwrite(FileSystem.WriteMode.NO_OVERWRITE)
+									.setInjectEntropy(true));
 
 					// success, managed to open the stream
 					this.statePath = statePath;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index 8bafdf7..c962146 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -138,13 +139,13 @@ public class FsCheckpointStateOutputStreamTest {
 	 */
 	@Test
 	public void testCleanupWhenClosingStream() throws IOException {
-
 		final FileSystem fs = mock(FileSystem.class);
 		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
 
 		final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
 
 		when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))).thenReturn(outputStream);
+		when(fs.create(pathCaptor.capture(), any(WriteOptions.class))).thenReturn(outputStream);
 
 		CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
 			Path.fromLocalFile(tempDir.newFolder()),
@@ -154,9 +155,6 @@ public class FsCheckpointStateOutputStreamTest {
 
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
-
-		verify(fs).create(any(Path.class), any(FileSystem.WriteMode.class));
-
 		stream.close();
 
 		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
@@ -170,9 +168,11 @@ public class FsCheckpointStateOutputStreamTest {
 		final FileSystem fs = mock(FileSystem.class);
 		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
 
-		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
+		final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
 
 		when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))).thenReturn(outputStream);
+		when(fs.create(pathCaptor.capture(), any(WriteOptions.class))).thenReturn(outputStream);
+
 		doThrow(new IOException("Test IOException.")).when(outputStream).close();
 
 		CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
@@ -184,8 +184,6 @@ public class FsCheckpointStateOutputStreamTest {
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
 
-		verify(fs).create(any(Path.class), any(FileSystem.WriteMode.class));
-
 		try {
 			stream.closeAndGetHandle();
 			fail("Expected IOException");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
new file mode 100644
index 0000000..d1aa118
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests verifying that the FsStateBackend passes the entropy injection option
+ * to the FileSystem for state payload files, but not for metadata files.
+ */
+public class FsStateBackendEntropyTest {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	@After
+	public void resetFileSystems() throws Exception {
+		FileSystem.initialize(new Configuration());
+	}
+
+	@Test
+	public void testInjection() throws Exception {
+		FileSystem fs = spy(LocalFileSystem.getSharedInstance());
+		ArgumentCaptor<WriteOptions> optionsCaptor = ArgumentCaptor.forClass(WriteOptions.class);
+
+		Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
+
+		FsCheckpointStorage storage = new FsCheckpointStorage(
+				fs, checkpointDir, null, new JobID(), 1024);
+
+		CheckpointStorageLocation location = storage.initializeLocationForCheckpoint(96562);
+
+		// check entropy in task-owned state
+		try (CheckpointStateOutputStream stream = storage.createTaskOwnedStateStream()) {
+			stream.flush();
+		}
+
+		verify(fs, times(1)).create(any(Path.class), optionsCaptor.capture());
+		assertTrue(optionsCaptor.getValue().isInjectEntropy());
+		reset(fs);
+
+		// check entropy in the exclusive/shared state
+		try (CheckpointStateOutputStream stream =
+				location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
+
+			stream.flush();
+		}
+
+		verify(fs, times(1)).create(any(Path.class), optionsCaptor.capture());
+		assertTrue(optionsCaptor.getValue().isInjectEntropy());
+		reset(fs);
+
+		// check that there is no entropy in the metadata
+		// check entropy in the exclusive/shared state
+		try (CheckpointMetadataOutputStream stream = location.createMetadataOutputStream()) {
+			stream.flush();
+		}
+
+		verify(fs, times(0)).create(any(Path.class), any(WriteOptions.class));
+	}
+}


[flink] 03/03: [FLINK-9061] [s3 presto] Add entropy injection to S3 file system

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a58fa985c4944dd0cf39fa622ec0aa4b35f21f44
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Aug 23 15:43:41 2018 +0200

    [FLINK-9061] [s3 presto] Add entropy injection to S3 file system
---
 .../java/org/apache/flink/util/StringUtils.java    |  32 +++
 .../org/apache/flink/util/StringUtilsTest.java     |  14 +
 .../flink/fs/s3presto/S3FileSystemFactory.java     | 100 ++++++-
 .../flink/fs/s3presto/S3PrestoFileSystem.java      | 301 +++++++++++++++++++++
 .../fs/s3presto/PrestoS3FileSystemEntropyTest.java | 133 +++++++++
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |  26 +-
 6 files changed, 590 insertions(+), 16 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 208a301..c3b3808 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -247,6 +248,37 @@ public final class StringUtils {
 	}
 
 	/**
+	 * Appends a random alphanumeric string of given length to the given string buffer.
+	 *
+	 * @param rnd The random number generator to use.
+	 * @param buffer The buffer to append to.
+	 * @param length The number of alphanumeric characters to append.
+	 */
+	public static void appendRandomAlphanumericString(Random rnd, StringBuilder buffer, int length) {
+		checkNotNull(rnd);
+		checkArgument(length >= 0);
+
+		for (int i = 0; i < length; i++) {
+			buffer.append(nextAlphanumericChar(rnd));
+		}
+	}
+
+	private static char nextAlphanumericChar(Random rnd) {
+		int which = rnd.nextInt(62);
+		char c;
+		if (which < 10) {
+			c = (char) ('0' + which);
+		}
+		else if (which < 36) {
+			c = (char) ('A' - 10 + which);
+		}
+		else {
+			c = (char) ('a' - 36 + which);
+		}
+		return c;
+	}
+
+	/**
 	 * Writes a String to the given output.
 	 * The written string can be read with {@link #readString(DataInputView)}.
 	 *
diff --git a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 5f705b4..1c9abf2 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.util;
 
 import org.junit.Test;
 
+import java.util.Random;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link StringUtils}.
@@ -56,4 +59,15 @@ public class StringUtilsTest extends TestLogger {
 		String hex = StringUtils.byteToHexString(byteArray);
 		assertEquals("019f314a", hex);
 	}
+
+	@Test
+	public void testAppendAlphanumeric() {
+		StringBuilder bld = new StringBuilder();
+		StringUtils.appendRandomAlphanumericString(new Random(), bld, 256);
+		String str = bld.toString();
+
+		if (!str.matches("[a-zA-Z0-9]+")) {
+			fail("Not alphanumeric: " + str);
+		}
+	}
 }
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index a04f9c9..230d18b 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,12 +19,20 @@
 package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -34,7 +42,31 @@ import java.util.Set;
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory implements FileSystemFactory {
+
+	/**
+	 * The substring to be replaced by random entropy in checkpoint paths.
+	 */
+	public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
+			.key("s3.entropy.key")
+			.noDefaultValue()
+			.withDescription(
+					"This option can be used to improve performance due to sharding issues on Amazon S3. " +
+					"For file creations with entropy injection, this key will be replaced by random " +
+					"alphanumeric characters. For other file creations, the key will be filtered out.");
+
+	/**
+	 * The number of entropy characters, in case entropy injection is configured.
+	 */
+	public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
+			.key("s3.entropy.length")
+			.defaultValue(4)
+			.withDescription(
+					"When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
+					"random characters to replace the entropy key with.");
+
+	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
+
 	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
 		new HashSet<>(Collections.singletonList("com.amazonaws."));
 
@@ -50,8 +82,55 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
 			{ "presto.s3.secret.key", "presto.s3.secret-key" }
 	};
 
-	public S3FileSystemFactory() {
-		super("Presto S3 File System", createHadoopConfigLoader());
+	private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
+
+	private final HadoopConfigLoader hadoopConfigLoader = createHadoopConfigLoader();
+
+	private Configuration flinkConfig;
+
+	@Override
+	public void configure(Configuration config) {
+		flinkConfig = config;
+		hadoopConfigLoader.setFlinkConfig(config);
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		LOG.debug("Creating S3 FileSystem backed by Presto S3 FileSystem");
+		LOG.debug("Loading Hadoop configuration for Presto S3 File System");
+
+		try {
+			// instantiate the presto file system
+			org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
+			org.apache.hadoop.fs.FileSystem fs = new PrestoS3FileSystem();
+			fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
+
+			// load the entropy injection settings
+			String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
+			int numEntropyChars = -1;
+
+			if (entropyInjectionKey != null) {
+				if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
+					throw new IllegalConfigurationException("Invalid character in value for " +
+							ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
+				}
+
+				numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
+
+				if (numEntropyChars <= 0) {
+					throw new IllegalConfigurationException(
+							ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
+				}
+			}
+
+			return new S3PrestoFileSystem(fs, entropyInjectionKey, numEntropyChars);
+		}
+		catch (IOException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new IOException(e.getMessage(), e);
+		}
 	}
 
 	@Override
@@ -65,13 +144,7 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
 			"presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
 	}
 
-	@Override
-	protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
-		return new PrestoS3FileSystem();
-	}
-
-	@Override
-	protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+	static URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
 		final String scheme = fsUri.getScheme();
 		final String authority = fsUri.getAuthority();
 		final URI initUri;
@@ -88,10 +161,11 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
 		return initUri;
 	}
 
-	private URI createURI(String str) {
+	static URI createURI(String str) {
 		try {
 			return new URI(str);
-		} catch (URISyntaxException e) {
+		}
+		catch (URISyntaxException e) {
 			throw new FlinkRuntimeException("Error in s3 aws URI - " + str, e);
 		}
 	}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
new file mode 100644
index 0000000..e6a6ae4
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Flink FileSystem against S3, wrapping the Presto Hadoop S3 File System implementation.
+ *
+ * <p>This class bases heavily on the {@link org.apache.flink.runtime.fs.hdfs.HadoopFileSystem} class.
+ * Code is copied here for the sake of minimal changes to the original class within a minor release.
+ */
+class S3PrestoFileSystem extends FileSystem {
+
+	/** The wrapped Hadoop File System. */
+	private final org.apache.hadoop.fs.FileSystem fs;
+
+	@Nullable
+	private final String entropyInjectionKey;
+
+	private final int entropyLength;
+
+	/**
+	 * Wraps the given Hadoop File System object as a Flink File System object.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+	 */
+	public S3PrestoFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+		this(hadoopFileSystem, null, -1);
+	}
+
+	/**
+	 * Wraps the given Hadoop File System object as a Flink File System object.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * <p>This constructor additionally configures the entropy injection for the file system.
+	 *
+	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+	 * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
+	 * @param entropyLength The number of random alphanumeric characters to inject as entropy.
+	 */
+	public S3PrestoFileSystem(
+			org.apache.hadoop.fs.FileSystem hadoopFileSystem,
+			@Nullable String entropyInjectionKey,
+			int entropyLength) {
+
+		if (entropyInjectionKey != null && entropyLength <= 0) {
+			throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
+		}
+
+		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+		this.entropyInjectionKey = entropyInjectionKey;
+		this.entropyLength = entropyLength;
+	}
+
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
+
+	public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
+		return fs;
+	}
+
+	@Nullable
+	public String getEntropyInjectionKey() {
+		return entropyInjectionKey;
+	}
+
+	public int getEntropyLength() {
+		return entropyLength;
+	}
+
+	// ------------------------------------------------------------------------
+	//  file system methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Path getWorkingDirectory() {
+		return new Path(fs.getWorkingDirectory().toUri());
+	}
+
+	public Path getHomeDirectory() {
+		return new Path(fs.getHomeDirectory().toUri());
+	}
+
+	@Override
+	public URI getUri() {
+		return fs.getUri();
+	}
+
+	@Override
+	public FileStatus getFileStatus(final Path f) throws IOException {
+		org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(toHadoopPath(f));
+		return new HadoopFileStatus(status);
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(
+			final FileStatus file,
+			final long start,
+			final long len) throws IOException {
+
+		if (!(file instanceof HadoopFileStatus)) {
+			throw new IOException("file is not an instance of HadoopFileStatus");
+		}
+
+		final HadoopFileStatus f = (HadoopFileStatus) file;
+
+		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
+				start, len);
+
+		// Wrap up HDFS specific block location objects
+		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
+		for (int i = 0; i < distBlkLocations.length; i++) {
+			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
+		}
+
+		return distBlkLocations;
+	}
+
+	@Override
+	public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
+		final org.apache.hadoop.fs.Path path = toHadoopPath(f);
+		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
+		return new HadoopDataInputStream(fdis);
+	}
+
+	@Override
+	public HadoopDataInputStream open(final Path f) throws IOException {
+		final org.apache.hadoop.fs.Path path = toHadoopPath(f);
+		final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
+		return new HadoopDataInputStream(fdis);
+	}
+
+	@Override
+	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
+		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
+				fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
+		return new HadoopDataOutputStream(fsDataOutputStream);
+	}
+
+	@Override
+	public FSDataOutputStream create(final Path f, final WriteOptions options) throws IOException {
+		final org.apache.hadoop.fs.Path path = options.isInjectEntropy()
+				? toHadoopPathInjectEntropy(f)
+				: toHadoopPath(f);
+
+		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = fs.create(
+				path, options.getOverwrite() == WriteMode.OVERWRITE);
+
+		return new HadoopDataOutputStream(fsDataOutputStream);
+	}
+
+	@Override
+	public boolean delete(final Path f, final boolean recursive) throws IOException {
+		return fs.delete(toHadoopPath(f), recursive);
+	}
+
+	@Override
+	public boolean exists(Path f) throws IOException {
+		return fs.exists(toHadoopPath(f));
+	}
+
+	@Override
+	public FileStatus[] listStatus(final Path f) throws IOException {
+		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = fs.listStatus(toHadoopPath(f));
+		final FileStatus[] files = new FileStatus[hadoopFiles.length];
+
+		// Convert types
+		for (int i = 0; i < files.length; i++) {
+			files[i] = new HadoopFileStatus(hadoopFiles[i]);
+		}
+
+		return files;
+	}
+
+	@Override
+	public boolean mkdirs(final Path f) throws IOException {
+		return fs.mkdirs(toHadoopPath(f));
+	}
+
+	@Override
+	public boolean rename(final Path src, final Path dst) throws IOException {
+		return fs.rename(toHadoopPath(src), toHadoopPath(dst));
+	}
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public long getDefaultBlockSize() {
+		return fs.getDefaultBlockSize();
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return true;
+	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.OBJECT_STORE;
+	}
+
+	// ------------------------------------------------------------------------
+	//  entropy utilities
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	org.apache.hadoop.fs.Path toHadoopPath(Path path) throws IOException {
+		return rewritePathForEntropyKey(path, false);
+	}
+
+	@VisibleForTesting
+	org.apache.hadoop.fs.Path toHadoopPathInjectEntropy(Path path) throws IOException {
+		return rewritePathForEntropyKey(path, true);
+	}
+
+	private org.apache.hadoop.fs.Path rewritePathForEntropyKey(Path path, boolean addEntropy) throws IOException {
+		if (entropyInjectionKey == null) {
+			return convertToHadoopPath(path);
+		}
+		else {
+			final URI originalUri = path.toUri();
+			final String checkpointPath = originalUri.getPath();
+
+			final int indexOfKey = checkpointPath.indexOf(entropyInjectionKey);
+			if (indexOfKey == -1) {
+				return convertToHadoopPath(path);
+			}
+			else {
+				final StringBuilder buffer = new StringBuilder(checkpointPath.length());
+				buffer.append(checkpointPath, 0, indexOfKey);
+
+				if (addEntropy) {
+					StringUtils.appendRandomAlphanumericString(ThreadLocalRandom.current(), buffer, entropyLength);
+				}
+
+				buffer.append(checkpointPath, indexOfKey + entropyInjectionKey.length(), checkpointPath.length());
+
+				final String rewrittenPath = buffer.toString();
+				try {
+					return convertToHadoopPath(new URI(
+							originalUri.getScheme(),
+							originalUri.getAuthority(),
+							rewrittenPath,
+							originalUri.getQuery(),
+							originalUri.getFragment()));
+				}
+				catch (URISyntaxException e) {
+					// this should actually never happen, because the URI was valid before
+					throw new IOException("URI format error while processing path for entropy injection", e);
+				}
+			}
+		}
+	}
+
+	private static org.apache.hadoop.fs.Path convertToHadoopPath(URI uri) {
+		return new org.apache.hadoop.fs.Path(uri);
+	}
+
+	private static org.apache.hadoop.fs.Path convertToHadoopPath(Path path) {
+		return convertToHadoopPath(path.toUri());
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
new file mode 100644
index 0000000..587b02e
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the entropy injection in the {@link S3PrestoFileSystem}.
+ */
+public class PrestoS3FileSystemEntropyTest {
+
+	@Test
+	public void testEmptyPath() throws Exception {
+		Path path = new Path("hdfs://localhost:12345");
+		S3PrestoFileSystem fs = createFs("test", 4);
+
+		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+	}
+
+	@Test
+	public void testFullUriNonMatching() throws Exception {
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/to/the/file");
+		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+	}
+
+	@Test
+	public void testFullUriMatching() throws Exception {
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+		S3PrestoFileSystem fs = createFs("s0mek3y", 8);
+
+		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+		validateMatches(withEntropy, "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{8}/the/file");
+		assertEquals(new org.apache.hadoop.fs.Path("s3://hugo@myawesomehost:55522/path/the/file"), withoutEntropy);
+	}
+
+	@Test
+	public void testPathOnlyNonMatching() throws Exception {
+		Path path = new Path("/path/file");
+		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+	}
+
+	@Test
+	public void testPathOnlyMatching() throws Exception {
+		Path path = new Path("/path/_entropy_key_/file");
+		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+		validateMatches(withEntropy, "/path/[a-zA-Z0-9]{4}/file");
+		assertEquals(new org.apache.hadoop.fs.Path("/path/file"), withoutEntropy);
+	}
+
+	@Test
+	public void testEntropyNotFullSegment() throws Exception {
+		Path path = new Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
+		S3PrestoFileSystem fs = createFs("_entropy_key_", 3);
+
+		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+		validateMatches(withEntropy, "s3://myhost:122/entropy-[a-zA-Z0-9]{3}-suffix/file");
+		assertEquals(new org.apache.hadoop.fs.Path("s3://myhost:122/entropy--suffix/file"), withoutEntropy);
+	}
+
+	@Test
+	public void testWriteOptionWithEntropy() throws Exception {
+		FileSystem underlyingFs = mock(FileSystem.class);
+		when(underlyingFs.create(any(org.apache.hadoop.fs.Path.class), anyBoolean())).thenReturn(mock(FSDataOutputStream.class));
+		ArgumentCaptor<org.apache.hadoop.fs.Path> pathCaptor = ArgumentCaptor.forClass(org.apache.hadoop.fs.Path.class);
+
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+		S3PrestoFileSystem fs = new S3PrestoFileSystem(underlyingFs, "s0mek3y", 11);
+
+		fs.create(path, new WriteOptions().setInjectEntropy(true));
+		verify(underlyingFs).create(pathCaptor.capture(), anyBoolean());
+
+		validateMatches(pathCaptor.getValue(), "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{11}/the/file");
+	}
+
+	private static void validateMatches(org.apache.hadoop.fs.Path path, String pattern) {
+		if (!path.toString().matches(pattern)) {
+			fail("Path " + path + " does not match " + pattern);
+		}
+	}
+
+	private static S3PrestoFileSystem createFs(String entropyKey, int entropyLen) {
+		return new S3PrestoFileSystem(mock(FileSystem.class), entropyKey, entropyLen);
+	}
+
+	private org.apache.hadoop.fs.Path toHadoopPath(Path path) {
+		return new org.apache.hadoop.fs.Path(path.toUri());
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 4eeb2d4..9afad57 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.fs.s3presto;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.junit.After;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -41,6 +41,11 @@ import static org.junit.Assert.assertTrue;
  */
 public class PrestoS3FileSystemTest {
 
+	@After
+	public void resetFileSystemConfig() throws Exception {
+		FileSystem.initialize(new Configuration());
+	}
+
 	@Test
 	public void testConfigPropagation() throws Exception{
 		final Configuration conf = new Configuration();
@@ -90,14 +95,29 @@ public class PrestoS3FileSystemTest {
 			hadoopConfig.get("presto.s3.credentials-provider"));
 	}
 
+	@Test
+	public void testEntropyInjectionConfig() throws Exception {
+		final Configuration conf = new Configuration();
+		conf.setString("s3.entropy.key", "__entropy__");
+		conf.setInteger("s3.entropy.length", 7);
+
+		FileSystem.initialize(conf);
+
+		FileSystem fs = FileSystem.get(new URI("s3://test"));
+		S3PrestoFileSystem s3fs = (S3PrestoFileSystem) fs;
+
+		assertEquals("__entropy__", s3fs.getEntropyInjectionKey());
+		assertEquals(7, s3fs.getEntropyLength());
+	}
+
 	// ------------------------------------------------------------------------
 	//  utilities
 	// ------------------------------------------------------------------------
 
 	private static void validateBasicCredentials(FileSystem fs) throws Exception {
-		assertTrue(fs instanceof HadoopFileSystem);
+		assertTrue(fs instanceof S3PrestoFileSystem);
 
-		org.apache.hadoop.fs.FileSystem hadoopFs = ((HadoopFileSystem) fs).getHadoopFileSystem();
+		org.apache.hadoop.fs.FileSystem hadoopFs = ((S3PrestoFileSystem) fs).getHadoopFileSystem();
 		assertTrue(hadoopFs instanceof PrestoS3FileSystem);
 
 		try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {