You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/25 06:51:59 UTC

[flink] 01/06: Revert "[FLINK-9061] Add entropy injection to S3 file system"

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aff9994862331a81ab93dd206073a0503d784c84
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Sep 20 21:57:22 2018 +0200

    Revert "[FLINK-9061] Add entropy injection to S3 file system"
    
    This was incorrectly applied to the 1.6 branch
---
 .../java/org/apache/flink/core/fs/FileSystem.java  |  44 +--
 .../org/apache/flink/core/fs/WriteOptions.java     | 144 ----------
 .../java/org/apache/flink/util/StringUtils.java    |  32 ---
 .../org/apache/flink/util/StringUtilsTest.java     |  14 -
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    |  25 --
 .../flink/fs/s3presto/S3FileSystemFactory.java     | 100 +------
 .../flink/fs/s3presto/S3PrestoFileSystem.java      | 301 ---------------------
 .../fs/s3presto/PrestoS3FileSystemEntropyTest.java | 133 ---------
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |  26 +-
 .../state/filesystem/FsCheckpointStorage.java      |  17 +-
 .../filesystem/FsCheckpointStreamFactory.java      |   7 +-
 .../FsCheckpointStateOutputStreamTest.java         |  12 +-
 .../filesystem/FsStateBackendEntropyTest.java      |  99 -------
 13 files changed, 48 insertions(+), 906 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index ba2113a..d451109 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -576,8 +576,29 @@ public abstract class FileSystem {
 	/**
 	 * Opens an FSDataOutputStream at the indicated Path.
 	 *
-	 * @deprecated Deprecated in favor of {@link #create(Path, WriteOptions)} which offers better extensibility
-	 *             to options that are supported only by some filesystems implementations.
+	 * <p>This method is deprecated, because most of its parameters are ignored by most file systems.
+	 * To control for example the replication factor and block size in the Hadoop Distributed File system,
+	 * make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
+	 * or in the classpath of either Flink or the user code.
+	 *
+	 * @param f
+	 *        the file name to open
+	 * @param overwrite
+	 *        if a file with this name already exists, then if true,
+	 *        the file will be overwritten, and if false an error will be thrown.
+	 * @param bufferSize
+	 *        the size of the buffer to be used.
+	 * @param replication
+	 *        required block replication for the file.
+	 * @param blockSize
+	 *        the size of the file blocks
+	 *
+	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
+	 *                     a file already exists at that path and the write mode indicates to not
+	 *                     overwrite the file.
+	 *
+	 * @deprecated Deprecated because not well supported across types of file systems.
+	 *             Control the behavior of specific file systems via configurations instead.
 	 */
 	@Deprecated
 	public FSDataOutputStream create(
@@ -628,25 +649,6 @@ public abstract class FileSystem {
 	public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
 
 	/**
-	 * Creates a new file at the given path and opens an FSDataOutputStream to that new file.
-	 *
-	 * <p>This method takes various options, some of which are not supported by all file systems
-	 * (such as controlling block size).
-	 *
-	 * <p>Implementation note: This method should be abstract, but is currently not in order to not break
-	 * backwards compatibility of this class with earlier Flink versions.
-	 *
-	 * @param f The path for the new file.
-	 * @param options The options to parametrize the file and stream creation.
-	 * @return The stream to the new file at the target path.
-	 *
-	 * @throws IOException Thrown if an error occurs while creating the file or opening the stream.
-	 */
-	public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
-		return create(f, options.getOverwrite());
-	}
-
-	/**
 	 * Renames the file/directory src to dst.
 	 *
 	 * @param src
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
deleted file mode 100644
index 70f4973..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-
-import javax.annotation.Nullable;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Write options that can be passed to the methods that write files.
- */
-@Public
-public class WriteOptions {
-
-	private WriteMode overwrite = WriteMode.NO_OVERWRITE;
-
-	@Nullable
-	private BlockOptions blockSettings;
-
-	private boolean injectEntropy;
-
-	// ------------------------------------------------------------------------
-	//  getters & setters
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the overwrite option.
-	 */
-	public WriteMode getOverwrite() {
-		return overwrite;
-	}
-
-	/**
-	 * Sets the overwrite option.
-	 *
-	 * <p>Method returns this object for fluent function call chaining.
-	 */
-	public WriteOptions setOverwrite(WriteMode overwrite) {
-		this.overwrite = checkNotNull(overwrite);
-		return this;
-	}
-
-	/**
-	 * Gets the block writing settings, like size and replication factor.
-	 * Returns null if no settings are defined.
-	 */
-	@Nullable
-	public BlockOptions getBlockSettings() {
-		return blockSettings;
-	}
-
-	/**
-	 * Sets the block settings, for file systems working with block replication and
-	 * exposing those settings
-	 *
-	 * <p>Method returns this object for fluent function call chaining.
-	 */
-	public WriteOptions setBlockSettings(@Nullable BlockOptions blockSettings) {
-		this.blockSettings = blockSettings;
-		return this;
-	}
-
-	/**
-	 * Gets whether to inject entropy into the path.
-	 */
-	public boolean isInjectEntropy() {
-		return injectEntropy;
-	}
-
-	/**
-	 * Sets whether to inject entropy into the path.
-	 *
-	 * <p>Entropy injection is only supported select filesystems like S3 to overcome
-	 * scalability issues in the sharding. For this option to have any effect, the
-	 * file system must be configured to replace an entropy key with entropy, and the
-	 * path that is written to must contain the entropy key.
-	 *
-	 * <p>Method returns this object for fluent function call chaining.
-	 */
-	public WriteOptions setInjectEntropy(boolean injectEntropy) {
-		this.injectEntropy = injectEntropy;
-		return this;
-	}
-
-	// ------------------------------------------------------------------------
-	//  nested options classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Settings for block replication. Interpreted only by filesystems that are based
-	 * expose block replication settings.
-	 */
-	@Public
-	public static class BlockOptions {
-
-		/** The size of the blocks, in bytes. */
-		private long blockSize;
-
-		/** The number of times the block should be replicated. */
-		private int replicationFactor;
-
-		public BlockOptions(long blockSize, int replicationFactor) {
-			checkArgument(blockSize > 0, "blockSize must be >0");
-			checkArgument(replicationFactor > 0, "replicationFactor must be >=1");
-
-			this.blockSize = blockSize;
-			this.replicationFactor = replicationFactor;
-		}
-
-		/**
-		 * Gets the block size, in bytes.
-		 */
-		public long getBlockSize() {
-			return blockSize;
-		}
-
-		/**
-		 * Gets the number of times the block should be replicated.
-		 */
-		public int getReplicationFactor() {
-			return replicationFactor;
-		}
-	}
-}
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index c3b3808..208a301 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -248,37 +247,6 @@ public final class StringUtils {
 	}
 
 	/**
-	 * Appends a random alphanumeric string of given length to the given string buffer.
-	 *
-	 * @param rnd The random number generator to use.
-	 * @param buffer The buffer to append to.
-	 * @param length The number of alphanumeric characters to append.
-	 */
-	public static void appendRandomAlphanumericString(Random rnd, StringBuilder buffer, int length) {
-		checkNotNull(rnd);
-		checkArgument(length >= 0);
-
-		for (int i = 0; i < length; i++) {
-			buffer.append(nextAlphanumericChar(rnd));
-		}
-	}
-
-	private static char nextAlphanumericChar(Random rnd) {
-		int which = rnd.nextInt(62);
-		char c;
-		if (which < 10) {
-			c = (char) ('0' + which);
-		}
-		else if (which < 36) {
-			c = (char) ('A' - 10 + which);
-		}
-		else {
-			c = (char) ('a' - 36 + which);
-		}
-		return c;
-	}
-
-	/**
 	 * Writes a String to the given output.
 	 * The written string can be read with {@link #readString(DataInputView)}.
 	 *
diff --git a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 1c9abf2..5f705b4 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,11 +20,8 @@ package org.apache.flink.util;
 
 import org.junit.Test;
 
-import java.util.Random;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link StringUtils}.
@@ -59,15 +56,4 @@ public class StringUtilsTest extends TestLogger {
 		String hex = StringUtils.byteToHexString(byteArray);
 		assertEquals("019f314a", hex);
 	}
-
-	@Test
-	public void testAppendAlphanumeric() {
-		StringBuilder bld = new StringBuilder();
-		StringUtils.appendRandomAlphanumericString(new Random(), bld, 256);
-		String str = bld.toString();
-
-		if (!str.matches("[a-zA-Z0-9]+")) {
-			fail("Not alphanumeric: " + str);
-		}
-	}
 }
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index bceed5e..065ba5a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -19,20 +19,16 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.fs.WriteOptions;
-import org.apache.flink.core.fs.WriteOptions.BlockOptions;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.Locale;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -40,9 +36,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class HadoopFileSystem extends FileSystem {
 
-	/** The write buffer size used by default. */
-	public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
-
 	/** The wrapped Hadoop File System. */
 	private final org.apache.hadoop.fs.FileSystem fs;
 
@@ -150,24 +143,6 @@ public class HadoopFileSystem extends FileSystem {
 	}
 
 	@Override
-	public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
-		BlockOptions blockSettings = options.getBlockSettings();
-		if (blockSettings == null) {
-			return create(f, options.getOverwrite());
-		}
-		else {
-			checkArgument(blockSettings.getReplicationFactor() <= Short.MAX_VALUE,
-					"block replication factor out of bounds");
-
-			return create(f,
-					options.getOverwrite() == WriteMode.OVERWRITE,
-					DEFAULT_WRITE_BUFFER_SIZE,
-					(short) blockSettings.getReplicationFactor(),
-					blockSettings.getBlockSize());
-		}
-	}
-
-	@Override
 	public boolean delete(final Path f, final boolean recursive) throws IOException {
 		return this.fs.delete(toHadoopPath(f), recursive);
 	}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index 230d18b..a04f9c9 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,20 +19,12 @@
 package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -42,31 +34,7 @@ import java.util.Set;
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory implements FileSystemFactory {
-
-	/**
-	 * The substring to be replaced by random entropy in checkpoint paths.
-	 */
-	public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
-			.key("s3.entropy.key")
-			.noDefaultValue()
-			.withDescription(
-					"This option can be used to improve performance due to sharding issues on Amazon S3. " +
-					"For file creations with entropy injection, this key will be replaced by random " +
-					"alphanumeric characters. For other file creations, the key will be filtered out.");
-
-	/**
-	 * The number of entropy characters, in case entropy injection is configured.
-	 */
-	public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
-			.key("s3.entropy.length")
-			.defaultValue(4)
-			.withDescription(
-					"When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
-					"random characters to replace the entropy key with.");
-
-	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
-
+public class S3FileSystemFactory extends AbstractFileSystemFactory {
 	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
 		new HashSet<>(Collections.singletonList("com.amazonaws."));
 
@@ -82,55 +50,8 @@ public class S3FileSystemFactory implements FileSystemFactory {
 			{ "presto.s3.secret.key", "presto.s3.secret-key" }
 	};
 
-	private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
-
-	private final HadoopConfigLoader hadoopConfigLoader = createHadoopConfigLoader();
-
-	private Configuration flinkConfig;
-
-	@Override
-	public void configure(Configuration config) {
-		flinkConfig = config;
-		hadoopConfigLoader.setFlinkConfig(config);
-	}
-
-	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		LOG.debug("Creating S3 FileSystem backed by Presto S3 FileSystem");
-		LOG.debug("Loading Hadoop configuration for Presto S3 File System");
-
-		try {
-			// instantiate the presto file system
-			org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
-			org.apache.hadoop.fs.FileSystem fs = new PrestoS3FileSystem();
-			fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
-
-			// load the entropy injection settings
-			String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
-			int numEntropyChars = -1;
-
-			if (entropyInjectionKey != null) {
-				if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
-					throw new IllegalConfigurationException("Invalid character in value for " +
-							ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
-				}
-
-				numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
-
-				if (numEntropyChars <= 0) {
-					throw new IllegalConfigurationException(
-							ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
-				}
-			}
-
-			return new S3PrestoFileSystem(fs, entropyInjectionKey, numEntropyChars);
-		}
-		catch (IOException e) {
-			throw e;
-		}
-		catch (Exception e) {
-			throw new IOException(e.getMessage(), e);
-		}
+	public S3FileSystemFactory() {
+		super("Presto S3 File System", createHadoopConfigLoader());
 	}
 
 	@Override
@@ -144,7 +65,13 @@ public class S3FileSystemFactory implements FileSystemFactory {
 			"presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
 	}
 
-	static URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+	@Override
+	protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+		return new PrestoS3FileSystem();
+	}
+
+	@Override
+	protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
 		final String scheme = fsUri.getScheme();
 		final String authority = fsUri.getAuthority();
 		final URI initUri;
@@ -161,11 +88,10 @@ public class S3FileSystemFactory implements FileSystemFactory {
 		return initUri;
 	}
 
-	static URI createURI(String str) {
+	private URI createURI(String str) {
 		try {
 			return new URI(str);
-		}
-		catch (URISyntaxException e) {
+		} catch (URISyntaxException e) {
 			throw new FlinkRuntimeException("Error in s3 aws URI - " + str, e);
 		}
 	}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
deleted file mode 100644
index e6a6ae4..0000000
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.fs.s3presto;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemKind;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
-import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
-import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
-import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
-import org.apache.flink.util.StringUtils;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A Flink FileSystem against S3, wrapping the Presto Hadoop S3 File System implementation.
- *
- * <p>This class bases heavily on the {@link org.apache.flink.runtime.fs.hdfs.HadoopFileSystem} class.
- * Code is copied here for the sake of minimal changes to the original class within a minor release.
- */
-class S3PrestoFileSystem extends FileSystem {
-
-	/** The wrapped Hadoop File System. */
-	private final org.apache.hadoop.fs.FileSystem fs;
-
-	@Nullable
-	private final String entropyInjectionKey;
-
-	private final int entropyLength;
-
-	/**
-	 * Wraps the given Hadoop File System object as a Flink File System object.
-	 * The given Hadoop file system object is expected to be initialized already.
-	 *
-	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
-	 */
-	public S3PrestoFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
-		this(hadoopFileSystem, null, -1);
-	}
-
-	/**
-	 * Wraps the given Hadoop File System object as a Flink File System object.
-	 * The given Hadoop file system object is expected to be initialized already.
-	 *
-	 * <p>This constructor additionally configures the entropy injection for the file system.
-	 *
-	 * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
-	 * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
-	 * @param entropyLength The number of random alphanumeric characters to inject as entropy.
-	 */
-	public S3PrestoFileSystem(
-			org.apache.hadoop.fs.FileSystem hadoopFileSystem,
-			@Nullable String entropyInjectionKey,
-			int entropyLength) {
-
-		if (entropyInjectionKey != null && entropyLength <= 0) {
-			throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
-		}
-
-		this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
-		this.entropyInjectionKey = entropyInjectionKey;
-		this.entropyLength = entropyLength;
-	}
-
-	// ------------------------------------------------------------------------
-	//  properties
-	// ------------------------------------------------------------------------
-
-	public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
-		return fs;
-	}
-
-	@Nullable
-	public String getEntropyInjectionKey() {
-		return entropyInjectionKey;
-	}
-
-	public int getEntropyLength() {
-		return entropyLength;
-	}
-
-	// ------------------------------------------------------------------------
-	//  file system methods
-	// ------------------------------------------------------------------------
-
-	@Override
-	public Path getWorkingDirectory() {
-		return new Path(fs.getWorkingDirectory().toUri());
-	}
-
-	public Path getHomeDirectory() {
-		return new Path(fs.getHomeDirectory().toUri());
-	}
-
-	@Override
-	public URI getUri() {
-		return fs.getUri();
-	}
-
-	@Override
-	public FileStatus getFileStatus(final Path f) throws IOException {
-		org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(toHadoopPath(f));
-		return new HadoopFileStatus(status);
-	}
-
-	@Override
-	public BlockLocation[] getFileBlockLocations(
-			final FileStatus file,
-			final long start,
-			final long len) throws IOException {
-
-		if (!(file instanceof HadoopFileStatus)) {
-			throw new IOException("file is not an instance of HadoopFileStatus");
-		}
-
-		final HadoopFileStatus f = (HadoopFileStatus) file;
-
-		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
-				start, len);
-
-		// Wrap up HDFS specific block location objects
-		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
-		for (int i = 0; i < distBlkLocations.length; i++) {
-			distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
-		}
-
-		return distBlkLocations;
-	}
-
-	@Override
-	public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
-		final org.apache.hadoop.fs.Path path = toHadoopPath(f);
-		final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@Override
-	public HadoopDataInputStream open(final Path f) throws IOException {
-		final org.apache.hadoop.fs.Path path = toHadoopPath(f);
-		final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
-		return new HadoopDataInputStream(fdis);
-	}
-
-	@Override
-	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
-		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
-				fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
-		return new HadoopDataOutputStream(fsDataOutputStream);
-	}
-
-	@Override
-	public FSDataOutputStream create(final Path f, final WriteOptions options) throws IOException {
-		final org.apache.hadoop.fs.Path path = options.isInjectEntropy()
-				? toHadoopPathInjectEntropy(f)
-				: toHadoopPath(f);
-
-		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = fs.create(
-				path, options.getOverwrite() == WriteMode.OVERWRITE);
-
-		return new HadoopDataOutputStream(fsDataOutputStream);
-	}
-
-	@Override
-	public boolean delete(final Path f, final boolean recursive) throws IOException {
-		return fs.delete(toHadoopPath(f), recursive);
-	}
-
-	@Override
-	public boolean exists(Path f) throws IOException {
-		return fs.exists(toHadoopPath(f));
-	}
-
-	@Override
-	public FileStatus[] listStatus(final Path f) throws IOException {
-		final org.apache.hadoop.fs.FileStatus[] hadoopFiles = fs.listStatus(toHadoopPath(f));
-		final FileStatus[] files = new FileStatus[hadoopFiles.length];
-
-		// Convert types
-		for (int i = 0; i < files.length; i++) {
-			files[i] = new HadoopFileStatus(hadoopFiles[i]);
-		}
-
-		return files;
-	}
-
-	@Override
-	public boolean mkdirs(final Path f) throws IOException {
-		return fs.mkdirs(toHadoopPath(f));
-	}
-
-	@Override
-	public boolean rename(final Path src, final Path dst) throws IOException {
-		return fs.rename(toHadoopPath(src), toHadoopPath(dst));
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public long getDefaultBlockSize() {
-		return fs.getDefaultBlockSize();
-	}
-
-	@Override
-	public boolean isDistributedFS() {
-		return true;
-	}
-
-	@Override
-	public FileSystemKind getKind() {
-		return FileSystemKind.OBJECT_STORE;
-	}
-
-	// ------------------------------------------------------------------------
-	//  entropy utilities
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	org.apache.hadoop.fs.Path toHadoopPath(Path path) throws IOException {
-		return rewritePathForEntropyKey(path, false);
-	}
-
-	@VisibleForTesting
-	org.apache.hadoop.fs.Path toHadoopPathInjectEntropy(Path path) throws IOException {
-		return rewritePathForEntropyKey(path, true);
-	}
-
-	private org.apache.hadoop.fs.Path rewritePathForEntropyKey(Path path, boolean addEntropy) throws IOException {
-		if (entropyInjectionKey == null) {
-			return convertToHadoopPath(path);
-		}
-		else {
-			final URI originalUri = path.toUri();
-			final String checkpointPath = originalUri.getPath();
-
-			final int indexOfKey = checkpointPath.indexOf(entropyInjectionKey);
-			if (indexOfKey == -1) {
-				return convertToHadoopPath(path);
-			}
-			else {
-				final StringBuilder buffer = new StringBuilder(checkpointPath.length());
-				buffer.append(checkpointPath, 0, indexOfKey);
-
-				if (addEntropy) {
-					StringUtils.appendRandomAlphanumericString(ThreadLocalRandom.current(), buffer, entropyLength);
-				}
-
-				buffer.append(checkpointPath, indexOfKey + entropyInjectionKey.length(), checkpointPath.length());
-
-				final String rewrittenPath = buffer.toString();
-				try {
-					return convertToHadoopPath(new URI(
-							originalUri.getScheme(),
-							originalUri.getAuthority(),
-							rewrittenPath,
-							originalUri.getQuery(),
-							originalUri.getFragment()));
-				}
-				catch (URISyntaxException e) {
-					// this should actually never happen, because the URI was valid before
-					throw new IOException("URI format error while processing path for entropy injection", e);
-				}
-			}
-		}
-	}
-
-	private static org.apache.hadoop.fs.Path convertToHadoopPath(URI uri) {
-		return new org.apache.hadoop.fs.Path(uri);
-	}
-
-	private static org.apache.hadoop.fs.Path convertToHadoopPath(Path path) {
-		return convertToHadoopPath(path.toUri());
-	}
-}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
deleted file mode 100644
index 587b02e..0000000
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.fs.s3presto;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the entropy injection in the {@link S3PrestoFileSystem}.
- */
-public class PrestoS3FileSystemEntropyTest {
-
-	@Test
-	public void testEmptyPath() throws Exception {
-		Path path = new Path("hdfs://localhost:12345");
-		S3PrestoFileSystem fs = createFs("test", 4);
-
-		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
-		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
-	}
-
-	@Test
-	public void testFullUriNonMatching() throws Exception {
-		Path path = new Path("s3://hugo@myawesomehost:55522/path/to/the/file");
-		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
-
-		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
-		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
-	}
-
-	@Test
-	public void testFullUriMatching() throws Exception {
-		Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
-		S3PrestoFileSystem fs = createFs("s0mek3y", 8);
-
-		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
-		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
-
-		validateMatches(withEntropy, "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{8}/the/file");
-		assertEquals(new org.apache.hadoop.fs.Path("s3://hugo@myawesomehost:55522/path/the/file"), withoutEntropy);
-	}
-
-	@Test
-	public void testPathOnlyNonMatching() throws Exception {
-		Path path = new Path("/path/file");
-		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
-
-		assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
-		assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
-	}
-
-	@Test
-	public void testPathOnlyMatching() throws Exception {
-		Path path = new Path("/path/_entropy_key_/file");
-		S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
-
-		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
-		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
-
-		validateMatches(withEntropy, "/path/[a-zA-Z0-9]{4}/file");
-		assertEquals(new org.apache.hadoop.fs.Path("/path/file"), withoutEntropy);
-	}
-
-	@Test
-	public void testEntropyNotFullSegment() throws Exception {
-		Path path = new Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
-		S3PrestoFileSystem fs = createFs("_entropy_key_", 3);
-
-		org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
-		org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
-
-		validateMatches(withEntropy, "s3://myhost:122/entropy-[a-zA-Z0-9]{3}-suffix/file");
-		assertEquals(new org.apache.hadoop.fs.Path("s3://myhost:122/entropy--suffix/file"), withoutEntropy);
-	}
-
-	@Test
-	public void testWriteOptionWithEntropy() throws Exception {
-		FileSystem underlyingFs = mock(FileSystem.class);
-		when(underlyingFs.create(any(org.apache.hadoop.fs.Path.class), anyBoolean())).thenReturn(mock(FSDataOutputStream.class));
-		ArgumentCaptor<org.apache.hadoop.fs.Path> pathCaptor = ArgumentCaptor.forClass(org.apache.hadoop.fs.Path.class);
-
-		Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
-		S3PrestoFileSystem fs = new S3PrestoFileSystem(underlyingFs, "s0mek3y", 11);
-
-		fs.create(path, new WriteOptions().setInjectEntropy(true));
-		verify(underlyingFs).create(pathCaptor.capture(), anyBoolean());
-
-		validateMatches(pathCaptor.getValue(), "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{11}/the/file");
-	}
-
-	private static void validateMatches(org.apache.hadoop.fs.Path path, String pattern) {
-		if (!path.toString().matches(pattern)) {
-			fail("Path " + path + " does not match " + pattern);
-		}
-	}
-
-	private static S3PrestoFileSystem createFs(String entropyKey, int entropyLen) {
-		return new S3PrestoFileSystem(mock(FileSystem.class), entropyKey, entropyLen);
-	}
-
-	private org.apache.hadoop.fs.Path toHadoopPath(Path path) {
-		return new org.apache.hadoop.fs.Path(path.toUri());
-	}
-}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 9afad57..4eeb2d4 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.fs.s3presto;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.facebook.presto.hive.PrestoS3FileSystem;
-import org.junit.After;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -41,11 +41,6 @@ import static org.junit.Assert.assertTrue;
  */
 public class PrestoS3FileSystemTest {
 
-	@After
-	public void resetFileSystemConfig() throws Exception {
-		FileSystem.initialize(new Configuration());
-	}
-
 	@Test
 	public void testConfigPropagation() throws Exception{
 		final Configuration conf = new Configuration();
@@ -95,29 +90,14 @@ public class PrestoS3FileSystemTest {
 			hadoopConfig.get("presto.s3.credentials-provider"));
 	}
 
-	@Test
-	public void testEntropyInjectionConfig() throws Exception {
-		final Configuration conf = new Configuration();
-		conf.setString("s3.entropy.key", "__entropy__");
-		conf.setInteger("s3.entropy.length", 7);
-
-		FileSystem.initialize(conf);
-
-		FileSystem fs = FileSystem.get(new URI("s3://test"));
-		S3PrestoFileSystem s3fs = (S3PrestoFileSystem) fs;
-
-		assertEquals("__entropy__", s3fs.getEntropyInjectionKey());
-		assertEquals(7, s3fs.getEntropyLength());
-	}
-
 	// ------------------------------------------------------------------------
 	//  utilities
 	// ------------------------------------------------------------------------
 
 	private static void validateBasicCredentials(FileSystem fs) throws Exception {
-		assertTrue(fs instanceof S3PrestoFileSystem);
+		assertTrue(fs instanceof HadoopFileSystem);
 
-		org.apache.hadoop.fs.FileSystem hadoopFs = ((S3PrestoFileSystem) fs).getHadoopFileSystem();
+		org.apache.hadoop.fs.FileSystem hadoopFs = ((HadoopFileSystem) fs).getHadoopFileSystem();
 		assertTrue(hadoopFs instanceof PrestoS3FileSystem);
 
 		try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index 1549e01..af80af7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -32,7 +32,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An implementation of durable checkpoint storage to file systems.
@@ -55,25 +54,11 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 			JobID jobId,
 			int fileSizeThreshold) throws IOException {
 
-		this(checkpointBaseDirectory.getFileSystem(),
-				checkpointBaseDirectory,
-				defaultSavepointDirectory,
-				jobId,
-				fileSizeThreshold);
-	}
-
-	public FsCheckpointStorage(
-			FileSystem fs,
-			Path checkpointBaseDirectory,
-			@Nullable Path defaultSavepointDirectory,
-			JobID jobId,
-			int fileSizeThreshold) throws IOException {
-
 		super(jobId, defaultSavepointDirectory);
 
 		checkArgument(fileSizeThreshold >= 0);
 
-		this.fileSystem = checkNotNull(fs);
+		this.fileSystem = checkpointBaseDirectory.getFileSystem();
 		this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
 		this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
 		this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 228c5b4..609ef69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.state.filesystem;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -346,11 +345,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			for (int attempt = 0; attempt < 10; attempt++) {
 				try {
 					Path statePath = createStatePath();
-					FSDataOutputStream outStream = fs.create(
-							statePath,
-							new WriteOptions()
-									.setOverwrite(FileSystem.WriteMode.NO_OVERWRITE)
-									.setInjectEntropy(true));
+					FSDataOutputStream outStream = fs.create(statePath, FileSystem.WriteMode.NO_OVERWRITE);
 
 					// success, managed to open the stream
 					this.statePath = statePath;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index c962146..8bafdf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -139,13 +138,13 @@ public class FsCheckpointStateOutputStreamTest {
 	 */
 	@Test
 	public void testCleanupWhenClosingStream() throws IOException {
+
 		final FileSystem fs = mock(FileSystem.class);
 		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
 
 		final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
 
 		when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))).thenReturn(outputStream);
-		when(fs.create(pathCaptor.capture(), any(WriteOptions.class))).thenReturn(outputStream);
 
 		CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
 			Path.fromLocalFile(tempDir.newFolder()),
@@ -155,6 +154,9 @@ public class FsCheckpointStateOutputStreamTest {
 
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
+
+		verify(fs).create(any(Path.class), any(FileSystem.WriteMode.class));
+
 		stream.close();
 
 		verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
@@ -168,11 +170,9 @@ public class FsCheckpointStateOutputStreamTest {
 		final FileSystem fs = mock(FileSystem.class);
 		final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
 
-		final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+		final ArgumentCaptor<Path>  pathCaptor = ArgumentCaptor.forClass(Path.class);
 
 		when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))).thenReturn(outputStream);
-		when(fs.create(pathCaptor.capture(), any(WriteOptions.class))).thenReturn(outputStream);
-
 		doThrow(new IOException("Test IOException.")).when(outputStream).close();
 
 		CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
@@ -184,6 +184,8 @@ public class FsCheckpointStateOutputStreamTest {
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
 
+		verify(fs).create(any(Path.class), any(FileSystem.WriteMode.class));
+
 		try {
 			stream.closeAndGetHandle();
 			fail("Expected IOException");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
deleted file mode 100644
index d1aa118..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
-import org.apache.flink.runtime.state.CheckpointStorageLocation;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests verifying that the FsStateBackend passes the entropy injection option
- * to the FileSystem for state payload files, but not for metadata files.
- */
-public class FsStateBackendEntropyTest {
-
-	@Rule
-	public final TemporaryFolder tmp = new TemporaryFolder();
-
-	@After
-	public void resetFileSystems() throws Exception {
-		FileSystem.initialize(new Configuration());
-	}
-
-	@Test
-	public void testInjection() throws Exception {
-		FileSystem fs = spy(LocalFileSystem.getSharedInstance());
-		ArgumentCaptor<WriteOptions> optionsCaptor = ArgumentCaptor.forClass(WriteOptions.class);
-
-		Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
-
-		FsCheckpointStorage storage = new FsCheckpointStorage(
-				fs, checkpointDir, null, new JobID(), 1024);
-
-		CheckpointStorageLocation location = storage.initializeLocationForCheckpoint(96562);
-
-		// check entropy in task-owned state
-		try (CheckpointStateOutputStream stream = storage.createTaskOwnedStateStream()) {
-			stream.flush();
-		}
-
-		verify(fs, times(1)).create(any(Path.class), optionsCaptor.capture());
-		assertTrue(optionsCaptor.getValue().isInjectEntropy());
-		reset(fs);
-
-		// check entropy in the exclusive/shared state
-		try (CheckpointStateOutputStream stream =
-				location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
-
-			stream.flush();
-		}
-
-		verify(fs, times(1)).create(any(Path.class), optionsCaptor.capture());
-		assertTrue(optionsCaptor.getValue().isInjectEntropy());
-		reset(fs);
-
-		// check that there is no entropy in the metadata
-		// check entropy in the exclusive/shared state
-		try (CheckpointMetadataOutputStream stream = location.createMetadataOutputStream()) {
-			stream.flush();
-		}
-
-		verify(fs, times(0)).create(any(Path.class), any(WriteOptions.class));
-	}
-}