You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:30:13 UTC

[flink] 03/03: [FLINK-9061] [s3 presto] Add entropy injection to S3 file system

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a58fa985c4944dd0cf39fa622ec0aa4b35f21f44
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Aug 23 15:43:41 2018 +0200

    [FLINK-9061] [s3 presto] Add entropy injection to S3 file system
---
 .../java/org/apache/flink/util/StringUtils.java    |  32 +++
 .../org/apache/flink/util/StringUtilsTest.java     |  14 +
 .../flink/fs/s3presto/S3FileSystemFactory.java     | 100 ++++++-
 .../flink/fs/s3presto/S3PrestoFileSystem.java      | 301 +++++++++++++++++++++
 .../fs/s3presto/PrestoS3FileSystemEntropyTest.java | 133 +++++++++
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |  26 +-
 6 files changed, 590 insertions(+), 16 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 208a301..c3b3808 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -247,6 +248,37 @@ public final class StringUtils {
 	}
 
 	/**
+	 * Appends a random alphanumeric string of given length to the given string buffer.
+	 *
+	 * @param rnd The random number generator to use.
+	 * @param buffer The buffer to append to.
+	 * @param length The number of alphanumeric characters to append.
+	 */
+	public static void appendRandomAlphanumericString(Random rnd, StringBuilder buffer, int length) {
+		checkNotNull(rnd);
+		checkArgument(length >= 0);
+
+		for (int i = 0; i < length; i++) {
+			buffer.append(nextAlphanumericChar(rnd));
+		}
+	}
+
+	private static char nextAlphanumericChar(Random rnd) {
+		int which = rnd.nextInt(62);
+		char c;
+		if (which < 10) {
+			c = (char) ('0' + which);
+		}
+		else if (which < 36) {
+			c = (char) ('A' - 10 + which);
+		}
+		else {
+			c = (char) ('a' - 36 + which);
+		}
+		return c;
+	}
+
+	/**
 	 * Writes a String to the given output.
 	 * The written string can be read with {@link #readString(DataInputView)}.
 	 *
diff --git a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 5f705b4..1c9abf2 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.util;
 
 import org.junit.Test;
 
+import java.util.Random;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link StringUtils}.
@@ -56,4 +59,15 @@ public class StringUtilsTest extends TestLogger {
 		String hex = StringUtils.byteToHexString(byteArray);
 		assertEquals("019f314a", hex);
 	}
+
+	@Test
+	public void testAppendAlphanumeric() {
+		StringBuilder bld = new StringBuilder();
+		StringUtils.appendRandomAlphanumericString(new Random(), bld, 256);
+		String str = bld.toString();
+
+		if (!str.matches("[a-zA-Z0-9]+")) {
+			fail("Not alphanumeric: " + str);
+		}
+	}
 }
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index a04f9c9..230d18b 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,12 +19,20 @@
 package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -34,7 +42,31 @@ import java.util.Set;
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory implements FileSystemFactory {
+
+	/**
+	 * The substring to be replaced by random entropy in checkpoint paths.
+	 */
+	public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
+			.key("s3.entropy.key")
+			.noDefaultValue()
+			.withDescription(
+					"This option can be used to improve performance due to sharding issues on Amazon S3. " +
+					"For file creations with entropy injection, this key will be replaced by random " +
+					"alphanumeric characters. For other file creations, the key will be filtered out.");
+
+	/**
+	 * The number of entropy characters, in case entropy injection is configured.
+	 */
+	public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
+			.key("s3.entropy.length")
+			.defaultValue(4)
+			.withDescription(
+					"When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
+					"random characters to replace the entropy key with.");
+
+	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
+
 	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
 		new HashSet<>(Collections.singletonList("com.amazonaws."));
 
@@ -50,8 +82,55 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
 			{ "presto.s3.secret.key", "presto.s3.secret-key" }
 	};
 
-	public S3FileSystemFactory() {
-		super("Presto S3 File System", createHadoopConfigLoader());
+	private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
+
+	private final HadoopConfigLoader hadoopConfigLoader = createHadoopConfigLoader();
+
+	private Configuration flinkConfig;
+
+	@Override
+	public void configure(Configuration config) {
+		flinkConfig = config;
+		hadoopConfigLoader.setFlinkConfig(config);
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		LOG.debug("Creating S3 FileSystem backed by Presto S3 FileSystem");
+		LOG.debug("Loading Hadoop configuration for Presto S3 File System");
+
+		try {
+			// instantiate the presto file system
+			org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
+			org.apache.hadoop.fs.FileSystem fs = new PrestoS3FileSystem();
+			fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
+
+			// load the entropy injection settings
+			String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
+			int numEntropyChars = -1;
+
+			if (entropyInjectionKey != null) {
+				if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
+					throw new IllegalConfigurationException("Invalid character in value for " +
+							ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
+				}
+
+				numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
+
+				if (numEntropyChars <= 0) {
+					throw new IllegalConfigurationException(
+							ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
+				}
+			}
+
+			return new S3PrestoFileSystem(fs, entropyInjectionKey, numEntropyChars);
+		}
+		catch (IOException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new IOException(e.getMessage(), e);
+		}
 	}
 
 	@Override
@@ -65,13 +144,7 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
 			"presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
 	}
 
-	@Override
-	protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
-		return new PrestoS3FileSystem();
-	}
-
-	@Override
-	protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+	static URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
 		final String scheme = fsUri.getScheme();
 		final String authority = fsUri.getAuthority();
 		final URI initUri;
@@ -88,10 +161,11 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
 		return initUri;
 	}
 
-	private URI createURI(String str) {
+	static URI createURI(String str) {
 		try {
 			return new URI(str);
-		} catch (URISyntaxException e) {
+		}
+		catch (URISyntaxException e) {
 			throw new FlinkRuntimeException("Error in s3 aws URI - " + str, e);
 		}
 	}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
new file mode 100644
index 0000000..e6a6ae4
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Flink FileSystem against S3, wrapping the Presto Hadoop S3 File System implementation.
+ *
+ * <p>This class bases heavily on the {@link org.apache.flink.runtime.fs.hdfs.HadoopFileSystem} class.
+ * Code is copied here for the sake of minimal changes to the original class within a minor release.
+ */
+class S3PrestoFileSystem extends FileSystem {
+
+	/** The wrapped Hadoop File System. */
+	private final org.apache.hadoop.fs.FileSystem fs;
+
+	@Nullable
+	private final String entropyInjectionKey;
+
+	private final int entropyLength;
+
+	/**
+	 * Wraps the given Hadoop File System object as a Flink File System object.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+	 */
+	public S3PrestoFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+		this(hadoopFileSystem, null, -1);
+	}
+
+	/**
+	 * Wraps the given Hadoop File System object as a Flink File System object.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * <p>This constructor additionally configures the entropy injection for the file system.
+	 *
+	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+	 * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
+	 * @param entropyLength The number of random alphanumeric characters to inject as entropy.
+	 */
+	public S3PrestoFileSystem(
+			org.apache.hadoop.fs.FileSystem hadoopFileSystem,
+			@Nullable String entropyInjectionKey,
+			int entropyLength) {
+
+		if (entropyInjectionKey != null && entropyLength <= 0) {
+			throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
+		}
+
+		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+		this.entropyInjectionKey = entropyInjectionKey;
+		this.entropyLength = entropyLength;
+	}
+
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
+
+	public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
+		return fs;
+	}
+
+	@Nullable
+	public String getEntropyInjectionKey() {
+		return entropyInjectionKey;
+	}
+
+	public int getEntropyLength() {
+		return entropyLength;
+	}
+
+	// ------------------------------------------------------------------------
+	//  file system methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Path getWorkingDirectory() {
+		return new Path(fs.getWorkingDirectory().toUri());
+	}
+
+	public Path getHomeDirectory() {
+		return new Path(fs.getHomeDirectory().toUri());
+	}
+
+	@Override
+	public URI getUri() {
+		return fs.getUri();
+	}
+
+	@Override
+	public FileStatus getFileStatus(final Path f) throws IOException {
+		org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(toHadoopPath(f));
+		return new HadoopFileStatus(status);
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(
+			final FileStatus file,
+			final long start,
+			final long len) throws IOException {
+
+		if (!(file instanceof HadoopFileStatus)) {
+			throw new IOException("file is not an instance of HadoopFileStatus");
+		}
+
+		final HadoopFileStatus f = (HadoopFileStatus) file;
+
+		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
+				start, len);
+
+		// Wrap up HDFS specific block location objects
+		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
+		for (int i = 0; i < distBlkLocations.length; i++) {
+			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
+		}
+
+		return distBlkLocations;
+	}
+
+	@Override
+	public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
+		final org.apache.hadoop.fs.Path path = toHadoopPath(f);
+		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
+		return new HadoopDataInputStream(fdis);
+	}
+
+	@Override
+	public HadoopDataInputStream open(final Path f) throws IOException {
+		final org.apache.hadoop.fs.Path path = toHadoopPath(f);
+		final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
+		return new HadoopDataInputStream(fdis);
+	}
+
+	@Override
+	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
+		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
+				fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
+		return new HadoopDataOutputStream(fsDataOutputStream);
+	}
+
+	@Override
+	public FSDataOutputStream create(final Path f, final WriteOptions options) throws IOException {
+		final org.apache.hadoop.fs.Path path = options.isInjectEntropy()
+				? toHadoopPathInjectEntropy(f)
+				: toHadoopPath(f);
+
+		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = fs.create(
+				path, options.getOverwrite() == WriteMode.OVERWRITE);
+
+		return new HadoopDataOutputStream(fsDataOutputStream);
+	}
+
+	@Override
+	public boolean delete(final Path f, final boolean recursive) throws IOException {
+		return fs.delete(toHadoopPath(f), recursive);
+	}
+
+	@Override
+	public boolean exists(Path f) throws IOException {
+		return fs.exists(toHadoopPath(f));
+	}
+
+	@Override
+	public FileStatus[] listStatus(final Path f) throws IOException {
+		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = fs.listStatus(toHadoopPath(f));
+		final FileStatus[] files = new FileStatus[hadoopFiles.length];
+
+		// Convert types
+		for (int i = 0; i < files.length; i++) {
+			files[i] = new HadoopFileStatus(hadoopFiles[i]);
+		}
+
+		return files;
+	}
+
+	@Override
+	public boolean mkdirs(final Path f) throws IOException {
+		return fs.mkdirs(toHadoopPath(f));
+	}
+
+	@Override
+	public boolean rename(final Path src, final Path dst) throws IOException {
+		return fs.rename(toHadoopPath(src), toHadoopPath(dst));
+	}
+
+	@SuppressWarnings("deprecation")
+	@Override
+	public long getDefaultBlockSize() {
+		return fs.getDefaultBlockSize();
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return true;
+	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.OBJECT_STORE;
+	}
+
+	// ------------------------------------------------------------------------
+	//  entropy utilities
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	org.apache.hadoop.fs.Path toHadoopPath(Path path) throws IOException {
+		return rewritePathForEntropyKey(path, false);
+	}
+
+	@VisibleForTesting
+	org.apache.hadoop.fs.Path toHadoopPathInjectEntropy(Path path) throws IOException {
+		return rewritePathForEntropyKey(path, true);
+	}
+
+	private org.apache.hadoop.fs.Path rewritePathForEntropyKey(Path path, boolean addEntropy) throws IOException {
+		if (entropyInjectionKey == null) {
+			return convertToHadoopPath(path);
+		}
+		else {
+			final URI originalUri = path.toUri();
+			final String checkpointPath = originalUri.getPath();
+
+			final int indexOfKey = checkpointPath.indexOf(entropyInjectionKey);
+			if (indexOfKey == -1) {
+				return convertToHadoopPath(path);
+			}
+			else {
+				final StringBuilder buffer = new StringBuilder(checkpointPath.length());
+				buffer.append(checkpointPath, 0, indexOfKey);
+
+				if (addEntropy) {
+					StringUtils.appendRandomAlphanumericString(ThreadLocalRandom.current(), buffer, entropyLength);
+				}
+
+				buffer.append(checkpointPath, indexOfKey + entropyInjectionKey.length(), checkpointPath.length());
+
+				final String rewrittenPath = buffer.toString();
+				try {
+					return convertToHadoopPath(new URI(
+							originalUri.getScheme(),
+							originalUri.getAuthority(),
+							rewrittenPath,
+							originalUri.getQuery(),
+							originalUri.getFragment()));
+				}
+				catch (URISyntaxException e) {
+					// this should actually never happen, because the URI was valid before
+					throw new IOException("URI format error while processing path for entropy injection", e);
+				}
+			}
+		}
+	}
+
+	private static org.apache.hadoop.fs.Path convertToHadoopPath(URI uri) {
+		return new org.apache.hadoop.fs.Path(uri);
+	}
+
+	private static org.apache.hadoop.fs.Path convertToHadoopPath(Path path) {
+		return convertToHadoopPath(path.toUri());
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
new file mode 100644
index 0000000..587b02e
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the entropy injection in the {@link S3PrestoFileSystem}.
+ */
+public class PrestoS3FileSystemEntropyTest {
+
+	@Test
+	public void testEmptyPath() throws Exception {
+		Path path = new Path("hdfs://localhost:12345");
+		S3PrestoFileSystem fs = createFs("test", 4);
+
+		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+	}
+
+	@Test
+	public void testFullUriNonMatching() throws Exception {
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/to/the/file");
+		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+	}
+
+	@Test
+	public void testFullUriMatching() throws Exception {
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+		S3PrestoFileSystem fs = createFs("s0mek3y", 8);
+
+		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+		validateMatches(withEntropy, "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{8}/the/file");
+		assertEquals(new org.apache.hadoop.fs.Path("s3://hugo@myawesomehost:55522/path/the/file"), withoutEntropy);
+	}
+
+	@Test
+	public void testPathOnlyNonMatching() throws Exception {
+		Path path = new Path("/path/file");
+		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+	}
+
+	@Test
+	public void testPathOnlyMatching() throws Exception {
+		Path path = new Path("/path/_entropy_key_/file");
+		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+		validateMatches(withEntropy, "/path/[a-zA-Z0-9]{4}/file");
+		assertEquals(new org.apache.hadoop.fs.Path("/path/file"), withoutEntropy);
+	}
+
+	@Test
+	public void testEntropyNotFullSegment() throws Exception {
+		Path path = new Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
+		S3PrestoFileSystem fs = createFs("_entropy_key_", 3);
+
+		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+		validateMatches(withEntropy, "s3://myhost:122/entropy-[a-zA-Z0-9]{3}-suffix/file");
+		assertEquals(new org.apache.hadoop.fs.Path("s3://myhost:122/entropy--suffix/file"), withoutEntropy);
+	}
+
+	@Test
+	public void testWriteOptionWithEntropy() throws Exception {
+		FileSystem underlyingFs = mock(FileSystem.class);
+		when(underlyingFs.create(any(org.apache.hadoop.fs.Path.class), anyBoolean())).thenReturn(mock(FSDataOutputStream.class));
+		ArgumentCaptor<org.apache.hadoop.fs.Path> pathCaptor = ArgumentCaptor.forClass(org.apache.hadoop.fs.Path.class);
+
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+		S3PrestoFileSystem fs = new S3PrestoFileSystem(underlyingFs, "s0mek3y", 11);
+
+		fs.create(path, new WriteOptions().setInjectEntropy(true));
+		verify(underlyingFs).create(pathCaptor.capture(), anyBoolean());
+
+		validateMatches(pathCaptor.getValue(), "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{11}/the/file");
+	}
+
+	private static void validateMatches(org.apache.hadoop.fs.Path path, String pattern) {
+		if (!path.toString().matches(pattern)) {
+			fail("Path " + path + " does not match " + pattern);
+		}
+	}
+
+	private static S3PrestoFileSystem createFs(String entropyKey, int entropyLen) {
+		return new S3PrestoFileSystem(mock(FileSystem.class), entropyKey, entropyLen);
+	}
+
+	private org.apache.hadoop.fs.Path toHadoopPath(Path path) {
+		return new org.apache.hadoop.fs.Path(path.toUri());
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 4eeb2d4..9afad57 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.fs.s3presto;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.junit.After;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -41,6 +41,11 @@ import static org.junit.Assert.assertTrue;
  */
 public class PrestoS3FileSystemTest {
 
+	@After
+	public void resetFileSystemConfig() throws Exception {
+		FileSystem.initialize(new Configuration());
+	}
+
 	@Test
 	public void testConfigPropagation() throws Exception{
 		final Configuration conf = new Configuration();
@@ -90,14 +95,29 @@ public class PrestoS3FileSystemTest {
 			hadoopConfig.get("presto.s3.credentials-provider"));
 	}
 
+	@Test
+	public void testEntropyInjectionConfig() throws Exception {
+		final Configuration conf = new Configuration();
+		conf.setString("s3.entropy.key", "__entropy__");
+		conf.setInteger("s3.entropy.length", 7);
+
+		FileSystem.initialize(conf);
+
+		FileSystem fs = FileSystem.get(new URI("s3://test"));
+		S3PrestoFileSystem s3fs = (S3PrestoFileSystem) fs;
+
+		assertEquals("__entropy__", s3fs.getEntropyInjectionKey());
+		assertEquals(7, s3fs.getEntropyLength());
+	}
+
 	// ------------------------------------------------------------------------
 	//  utilities
 	// ------------------------------------------------------------------------
 
 	private static void validateBasicCredentials(FileSystem fs) throws Exception {
-		assertTrue(fs instanceof HadoopFileSystem);
+		assertTrue(fs instanceof S3PrestoFileSystem);
 
-		org.apache.hadoop.fs.FileSystem hadoopFs = ((HadoopFileSystem) fs).getHadoopFileSystem();
+		org.apache.hadoop.fs.FileSystem hadoopFs = ((S3PrestoFileSystem) fs).getHadoopFileSystem();
 		assertTrue(hadoopFs instanceof PrestoS3FileSystem);
 
 		try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {