You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/24 21:56:48 UTC

[flink] branch master updated (51f6f7f -> e0ab834)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 51f6f7f  [FLINK-10383] [s3] Prevent Hadoop configs in the classpath to seep into S3 configuration
     new 9aeb381  [hotfix] Add JavaDocs to CheckpointMetadataOutputStream
     new df02ed4  [FLINK-9061] [fs] Add entropy injector for file systems
     new fc7b9ac  [FLINK-9061] [checkpoints] FsStatebackend optionally injects entropy into state data file paths
     new d8a9c72  [FLINK-9061] [s3] Make base S3 file system entropy injecting
     new e0ab834  [hotfix] [runtime] Clean up some checkstyle violations

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/ops/filesystems.md                            |  30 ++++-
 ...emKind.java => EntropyInjectingFileSystem.java} |  25 ++--
 .../org/apache/flink/core/fs/EntropyInjector.java  | 135 +++++++++++++++++++
 ...ileSystemKind.java => OutputStreamAndPath.java} |  31 +++--
 .../java/org/apache/flink/util/StringUtils.java    |  33 +++++
 .../apache/flink/core/fs/EntropyInjectorTest.java  | 148 +++++++++++++++++++++
 .../org/apache/flink/util/StringUtilsTest.java     |  12 ++
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |  71 +++++++++-
 .../flink/fs/s3/common/FlinkS3FileSystem.java      |  59 +++++++-
 .../flink/fs/s3/common/HadoopConfigLoader.java     |   1 -
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |  79 +++++++++++
 .../src/test/resources/log4j-test.properties       |   0
 .../state/CheckpointMetadataOutputStream.java      |  15 +++
 .../FsCheckpointMetadataOutputStream.java          |   3 +-
 .../state/filesystem/FsCheckpointStorage.java      |  17 ++-
 .../filesystem/FsCheckpointStorageLocation.java    |   6 +-
 .../filesystem/FsCheckpointStreamFactory.java      |  33 +++--
 .../filesystem/FsStateBackendEntropyTest.java      | 123 +++++++++++++++++
 18 files changed, 772 insertions(+), 49 deletions(-)
 copy flink-core/src/main/java/org/apache/flink/core/fs/{FileSystemKind.java => EntropyInjectingFileSystem.java} (50%)
 create mode 100644 flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
 copy flink-core/src/main/java/org/apache/flink/core/fs/{FileSystemKind.java => OutputStreamAndPath.java} (64%)
 create mode 100644 flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
 create mode 100644 flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
 copy flink-filesystems/{flink-swift-fs-hadoop => flink-s3-fs-base}/src/test/resources/log4j-test.properties (100%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java


[flink] 05/05: [hotfix] [runtime] Clean up some checkstyle violations

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e0ab834ddf15a564034f3ecc6f981b29089b5ee4
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:52:50 2018 +0200

    [hotfix] [runtime] Clean up some checkstyle violations
---
 .../state/filesystem/FsCheckpointStreamFactory.java  | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 665e7b3..8253606 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -66,13 +66,13 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointStreamFactory.class);
 
-	/** Maximum size of state that is stored with the metadata, rather than in files */
+	/** Maximum size of state that is stored with the metadata, rather than in files. */
 	public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
 
-	/** Default size for the write buffer */
+	/** Default size for the write buffer. */
 	public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
 
-	/** State below this size will be stored as part of the metadata, rather than in files */
+	/** State below this size will be stored as part of the metadata, rather than in files. */
 	private final int fileStateThreshold;
 
 	/** The directory for checkpoint exclusive state data. */
@@ -121,9 +121,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 	@Override
 	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
-
-
-		Path target = scope == CheckpointedStateScope.EXCLUSIVE ?checkpointDirectory: sharedStateDirectory;
+		Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
 		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
 
 		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);
@@ -154,7 +152,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		private int pos;
 
 		private FSDataOutputStream outStream;
-		
+
 		private final int localStateThreshold;
 
 		private final Path basePath;
@@ -167,8 +165,8 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 		public FsCheckpointStateOutputStream(
 					Path basePath, FileSystem fs,
-					int bufferSize, int localStateThreshold)
-		{
+					int bufferSize, int localStateThreshold) {
+
 			if (bufferSize < localStateThreshold) {
 				throw new IllegalArgumentException();
 			}
@@ -202,7 +200,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 					// flush the write buffer to make it clear again
 					flush();
 				}
-				
+
 				// copy what is in the buffer
 				System.arraycopy(b, off, writeBuffer, pos, len);
 				pos += len;
@@ -303,7 +301,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 							flush();
 
 							pos = writeBuffer.length;
-						
+
 							long size = -1L;
 
 							// make a best effort attempt to figure out the size


[flink] 04/05: [FLINK-9061] [s3] Make base S3 file system entropy injecting

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8a9c72ed675669363e6c7d7499abee30fc19b88
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:46:37 2018 +0200

    [FLINK-9061] [s3] Make base S3 file system entropy injecting
---
 docs/ops/filesystems.md                            | 30 +++++++-
 .../fs/s3/common/AbstractS3FileSystemFactory.java  | 71 +++++++++++++++++--
 .../flink/fs/s3/common/FlinkS3FileSystem.java      | 59 +++++++++++++++-
 .../flink/fs/s3/common/HadoopConfigLoader.java     |  1 -
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java | 79 ++++++++++++++++++++++
 .../src/test/resources/log4j-test.properties       | 27 ++++++++
 6 files changed, 258 insertions(+), 9 deletions(-)

diff --git a/docs/ops/filesystems.md b/docs/ops/filesystems.md
index 78f2c96..416302e 100644
--- a/docs/ops/filesystems.md
+++ b/docs/ops/filesystems.md
@@ -24,7 +24,7 @@ under the License.
 
 This page provides details on setting up and configuring distributed file systems for use with Flink.
 
-## Flink' File System support
+## Flink's File System support
 
 Flink uses file systems both as a source and sink in streaming/batch applications, and as a target for checkpointing.
 These file systems can for example be *Unix/Windows file systems*, *HDFS*, or even object stores like *S3*.
@@ -122,6 +122,34 @@ These limits are enforced per TaskManager, so each TaskManager in a Flink applic
 In addition, the limits are also only enforced per FileSystem instance. Because File Systems are created per scheme and authority, different
 authorities will have their own connection pool. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
 
+## Entropy injection for S3 file systems
+
+The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) support entropy injection. Entropy injection is
+a technique to improve scalability of AWS S3 buckets through adding some random characters near the beginning of the key.
+
+If entropy injection is activated, a configured substring in the paths will be replaced by random characters. For example, path
+`s3://my-bucket/checkpoints/_entropy_/dashboard-job/` would be replaced by something like `s3://my-bucket/checkpoints/gf36ikvg/dashboard-job/`.
+
+**Note that this only happens when the file creation passes the option to inject entropy!**, otherwise the file path will
+simply remove the entropy key substring. See
+[FileSystem.create(Path, WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-)
+for details.
+
+*Note: The Flink runtime currently passes the option to inject entropy only to checkpoint data files.*
+*All other files, including checkpoint metadata and external URI do not inject entropy, to keep checkpoint URIs predictable.*
+
+To enable entropy injection, configure the *entropy key* and the *entropy length* parameters.
+
+```
+s3.entropy.key: _entropy_
+s3.entropy.length: 4 (default)
+
+```
+
+The `s3.entropy.key` defines the string in paths that is replaced by the random characters. Paths that do not contain the entropy key are left unchanged.
+If a file system operation does not pass the *"inject entropy"* write option, the entropy key substring is simply removed.
+The `s3.entropy.length` defined the number of random alphanumeric characters to replace the entropy key with.
+
 ## Adding new File System Implementations
 
 File system implementations are discovered by Flink through Java's service abstraction, making it easy to add additional file system implementations.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index ed24138..58144f0 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.fs.s3.common;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 
@@ -28,9 +31,36 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 
-/** Base class for Hadoop file system factories. */
+/**
+ * Base class for file system factories that create S3 file systems.
+ */
 public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 
+	/**
+	 * The substring to be replaced by random entropy in checkpoint paths.
+	 */
+	public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
+			.key("s3.entropy.key")
+			.noDefaultValue()
+			.withDescription(
+					"This option can be used to improve performance due to sharding issues on Amazon S3. " +
+					"For file creations with entropy injection, this key will be replaced by random " +
+					"alphanumeric characters. For other file creations, the key will be filtered out.");
+
+	/**
+	 * The number of entropy characters, in case entropy injection is configured.
+	 */
+	public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
+			.key("s3.entropy.length")
+			.defaultValue(4)
+			.withDescription(
+					"When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
+					"random characters to replace the entropy key with.");
+
+	// ------------------------------------------------------------------------
+
+	private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
+
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractS3FileSystemFactory.class);
 
 	/** Name of this factory for logging. */
@@ -38,29 +68,60 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 
 	private final HadoopConfigLoader hadoopConfigLoader;
 
+	private Configuration flinkConfig;
+
 	protected AbstractS3FileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
 		this.name = name;
 		this.hadoopConfigLoader = hadoopConfigLoader;
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
 	public void configure(Configuration config) {
+		flinkConfig = config;
 		hadoopConfigLoader.setFlinkConfig(config);
 	}
 
 	@Override
 	public FileSystem create(URI fsUri) throws IOException {
+		Configuration flinkConfig = this.flinkConfig;
+
+		if (flinkConfig == null) {
+			LOG.warn("Creating S3 FileSystem without configuring the factory. All behavior will be default.");
+			flinkConfig = new Configuration();
+		}
+
 		LOG.debug("Creating S3 file system backed by {}", name);
 		LOG.debug("Loading Hadoop configuration for {}", name);
 
 		try {
+			// create the Hadoop FileSystem
 			org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
 			org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
 			fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
-			return new FlinkS3FileSystem(fs);
-		} catch (IOException ioe) {
-			throw ioe;
-		} catch (Exception e) {
+
+			// load the entropy injection settings
+			String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
+			int numEntropyChars = -1;
+			if (entropyInjectionKey != null) {
+				if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
+					throw new IllegalConfigurationException("Invalid character in value for " +
+							ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
+				}
+				numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
+				if (numEntropyChars <= 0) {
+					throw new IllegalConfigurationException(
+							ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
+				}
+			}
+
+			return new FlinkS3FileSystem(fs, entropyInjectionKey, numEntropyChars);
+		}
+		catch (IOException e) {
+			throw e;
+		}
+		catch (Exception e) {
 			throw new IOException(e.getMessage(), e);
 		}
 	}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index a3d960a..84883bb 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -18,22 +18,77 @@
 
 package org.apache.flink.fs.s3.common;
 
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystem} interface for S3.
  * This class implements the common behavior implemented directly by Flink and delegates
  * common calls to an implementation of Hadoop's filesystem abstraction.
  */
-public class FlinkS3FileSystem extends HadoopFileSystem {
+public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInjectingFileSystem {
+
+	@Nullable
+	private final String entropyInjectionKey;
+
+	private final int entropyLength;
 
 	/**
-	 * Wraps the given Hadoop S3 File System object as a Flink S3 File System object.
+	 * Creates a FlinkS3FileSystem based on the given Hadoop S3 file system.
 	 * The given Hadoop file system object is expected to be initialized already.
 	 *
 	 * @param hadoopS3FileSystem The Hadoop FileSystem that will be used under the hood.
 	 */
 	public FlinkS3FileSystem(org.apache.hadoop.fs.FileSystem hadoopS3FileSystem) {
+		this(hadoopS3FileSystem, null, -1);
+	}
+
+	/**
+	 * Creates a FlinkS3FileSystem based on the given Hadoop S3 file system.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * <p>This constructor additionally configures the entropy injection for the file system.
+	 *
+	 * @param hadoopS3FileSystem The Hadoop FileSystem that will be used under the hood.
+	 * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
+	 * @param entropyLength The number of random alphanumeric characters to inject as entropy.
+	 */
+	public FlinkS3FileSystem(
+			org.apache.hadoop.fs.FileSystem hadoopS3FileSystem,
+			@Nullable String entropyInjectionKey,
+			int entropyLength) {
+
 		super(hadoopS3FileSystem);
+
+		if (entropyInjectionKey != null && entropyLength <= 0) {
+			throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
+		}
+
+		this.entropyInjectionKey = entropyInjectionKey;
+		this.entropyLength = entropyLength;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public String getEntropyInjectionKey() {
+		return entropyInjectionKey;
+	}
+
+	@Override
+	public String generateEntropy() {
+		return StringUtils.generateRandomAlphanumericString(ThreadLocalRandom.current(), entropyLength);
+	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.OBJECT_STORE;
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
index 5ca497c..1bbb757 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
@@ -97,7 +97,6 @@ public class HadoopConfigLoader {
 		for (String key : flinkConfig.keySet()) {
 			for (String prefix : flinkConfigPrefixes) {
 				if (key.startsWith(prefix)) {
-					String value = flinkConfig.getString(key, null);
 					String newKey = hadoopConfigPrefix + key.substring(prefix.length());
 					String newValue = fixHadoopConfig(key, flinkConfig.getString(key, null));
 					hadoopConfig.set(newKey, newValue);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
new file mode 100644
index 0000000..832bbf4
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.net.URI;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests that the file system factory picks up the entropy configuration properly.
+ */
+public class S3EntropyFsFactoryTest {
+
+	@Test
+	public void testEntropyInjectionConfig() throws Exception {
+		final Configuration conf = new Configuration();
+		conf.setString("s3.entropy.key", "__entropy__");
+		conf.setInteger("s3.entropy.length", 7);
+
+		TestFsFactory factory = new TestFsFactory();
+		factory.configure(conf);
+
+		FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
+		assertEquals("__entropy__", fs.getEntropyInjectionKey());
+		assertEquals(7, fs.generateEntropy().length());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestFsFactory extends AbstractS3FileSystemFactory {
+
+		TestFsFactory() {
+			super("testFs", new HadoopConfigLoader(
+					new String[0],
+					new String[0][],
+					"",
+					Collections.emptySet(),
+					Collections.emptySet(),
+					""));
+		}
+
+		@Override
+		protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+			return Mockito.mock(org.apache.hadoop.fs.FileSystem.class);
+		}
+
+		@Override
+		protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+			return fsUri;
+		}
+
+		@Override
+		public String getScheme() {
+			return "test";
+		}
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/resources/log4j-test.properties b/flink-filesystems/flink-s3-fs-base/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n


[flink] 01/05: [hotfix] Add JavaDocs to CheckpointMetadataOutputStream

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9aeb3813593613868cbf106d922986bb28201881
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Sep 23 22:37:56 2018 +0200

    [hotfix] Add JavaDocs to CheckpointMetadataOutputStream
---
 .../runtime/state/CheckpointMetadataOutputStream.java     | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
index 4180e88..70aff7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointMetadataOutputStream.java
@@ -22,8 +22,23 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 
 import java.io.IOException;
 
+/**
+ * An output stream for checkpoint metadata.
+ *
+ * <p>This stream is similar to the {@link CheckpointStreamFactory.CheckpointStateOutputStream},
+ * but for metadata files rather thancdata files.
+ *
+ * <p>This stream always creates a file, regardless of the amount of data written.
+ */
 public abstract class CheckpointMetadataOutputStream extends FSDataOutputStream {
 
+	/**
+	 * Closes the stream after all metadata was written and finalizes the checkpoint location.
+	 *
+	 * @return An object representing a finalized checkpoint storage location.
+	 *
+	 * @throws IOException Thrown, if the stream cannot be closed or the finalization fails.
+	 */
 	public abstract CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint() throws IOException;
 
 	/**


[flink] 03/05: [FLINK-9061] [checkpoints] FsStatebackend optionally injects entropy into state data file paths

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc7b9ac7172afcad9c928cbb0d39da49a89b9838
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:45:48 2018 +0200

    [FLINK-9061] [checkpoints] FsStatebackend optionally injects entropy into state data file paths
---
 .../FsCheckpointMetadataOutputStream.java          |   3 +-
 .../state/filesystem/FsCheckpointStorage.java      |  17 ++-
 .../filesystem/FsCheckpointStorageLocation.java    |   6 +-
 .../filesystem/FsCheckpointStreamFactory.java      |  13 ++-
 .../filesystem/FsStateBackendEntropyTest.java      | 123 +++++++++++++++++++++
 5 files changed, 153 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
index f18d578..95f78b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
@@ -133,7 +133,8 @@ public final class FsCheckpointMetadataOutputStream extends CheckpointMetadataOu
 					FileStateHandle metaDataHandle = new FileStateHandle(metadataFilePath, size);
 
 					return new FsCompletedCheckpointStorageLocation(
-							fileSystem, exclusiveCheckpointDir, metaDataHandle, exclusiveCheckpointDir.toString());
+							fileSystem, exclusiveCheckpointDir, metaDataHandle,
+							metaDataHandle.getFilePath().getParent().toString());
 				}
 				catch (Exception e) {
 					try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index af80af7..1549e01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An implementation of durable checkpoint storage to file systems.
@@ -54,11 +55,25 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 			JobID jobId,
 			int fileSizeThreshold) throws IOException {
 
+		this(checkpointBaseDirectory.getFileSystem(),
+				checkpointBaseDirectory,
+				defaultSavepointDirectory,
+				jobId,
+				fileSizeThreshold);
+	}
+
+	public FsCheckpointStorage(
+			FileSystem fs,
+			Path checkpointBaseDirectory,
+			@Nullable Path defaultSavepointDirectory,
+			JobID jobId,
+			int fileSizeThreshold) throws IOException {
+
 		super(jobId, defaultSavepointDirectory);
 
 		checkArgument(fileSizeThreshold >= 0);
 
-		this.fileSystem = checkpointBaseDirectory.getFileSystem();
+		this.fileSystem = checkNotNull(fs);
 		this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
 		this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
 		this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
index 5637b40..360ae2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
@@ -67,7 +68,10 @@ public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory imple
 		this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir);
 		this.reference = checkNotNull(reference);
 
-		this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
+		// the metadata file should not have entropy in its path
+		Path metadataDir = EntropyInjector.removeEntropyMarkerIfPresent(fileSystem, checkpointDir);
+
+		this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
 		this.fileStateSizeThreshold = fileStateSizeThreshold;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 609ef69..665e7b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.OutputStreamAndPath;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -344,12 +347,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			Exception latestException = null;
 			for (int attempt = 0; attempt < 10; attempt++) {
 				try {
-					Path statePath = createStatePath();
-					FSDataOutputStream outStream = fs.create(statePath, FileSystem.WriteMode.NO_OVERWRITE);
-
-					// success, managed to open the stream
-					this.statePath = statePath;
-					this.outStream = outStream;
+					OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware(
+							fs, createStatePath(), WriteMode.NO_OVERWRITE);
+					this.outStream = streamAndPath.stream();
+					this.statePath = streamAndPath.path();
 					return;
 				}
 				catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
new file mode 100644
index 0000000..12a4c42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests verifying that the FsStateBackend passes the entropy injection option
+ * to the FileSystem for state payload files, but not for metadata files.
+ */
+public class FsStateBackendEntropyTest {
+
+	static final String ENTROPY_MARKER = "__ENTROPY__";
+	static final String RESOLVED_MARKER = "+RESOLVED+";
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	@Test
+	public void testEntropyInjection() throws Exception {
+		final FileSystem fs = new TestEntropyAwareFs();
+
+		final Path checkpointDir = new Path(Path.fromLocalFile(tmp.newFolder()), ENTROPY_MARKER + "/checkpoints");
+		final String checkpointDirStr = checkpointDir.toString();
+
+		FsCheckpointStorage storage = new FsCheckpointStorage(
+				fs, checkpointDir, null, new JobID(), 1024);
+
+		FsCheckpointStorageLocation location = (FsCheckpointStorageLocation)
+				storage.initializeLocationForCheckpoint(96562);
+
+		assertThat(location.getCheckpointDirectory().toString(), startsWith(checkpointDirStr));
+		assertThat(location.getSharedStateDirectory().toString(), startsWith(checkpointDirStr));
+		assertThat(location.getTaskOwnedStateDirectory().toString(), startsWith(checkpointDirStr));
+		assertThat(location.getMetadataFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+
+		// check entropy in task-owned state
+		try (CheckpointStateOutputStream stream = storage.createTaskOwnedStateStream()) {
+			stream.flush();
+			FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+			assertNotNull(handle);
+			assertThat(handle.getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+			assertThat(handle.getFilePath().toString(), containsString(RESOLVED_MARKER));
+		}
+
+		// check entropy in the exclusive/shared state
+		try (CheckpointStateOutputStream stream =
+				location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
+
+			stream.flush();
+			FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+			assertNotNull(handle);
+			assertThat(handle.getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+			assertThat(handle.getFilePath().toString(), containsString(RESOLVED_MARKER));
+		}
+
+		// check that there is no entropy in the metadata
+		// check entropy in the exclusive/shared state
+		try (CheckpointMetadataOutputStream stream = location.createMetadataOutputStream()) {
+			stream.flush();
+			FsCompletedCheckpointStorageLocation handle =
+					(FsCompletedCheckpointStorageLocation) stream.closeAndFinalizeCheckpoint();
+
+			assertNotNull(handle);
+
+			// metadata files have no entropy
+			assertThat(handle.getMetadataHandle().getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+			assertThat(handle.getMetadataHandle().getFilePath().toString(), not(containsString(RESOLVED_MARKER)));
+
+			// external location is the same as metadata, without the file name
+			assertEquals(handle.getMetadataHandle().getFilePath().getParent().toString(), handle.getExternalPointer());
+		}
+	}
+
+	private static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+
+		@Override
+		public String getEntropyInjectionKey() {
+			return ENTROPY_MARKER;
+		}
+
+		@Override
+		public String generateEntropy() {
+			return RESOLVED_MARKER;
+		}
+	}
+}


[flink] 02/05: [FLINK-9061] [fs] Add entropy injector for file systems

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df02ed4d905c46c36f37b8c95e0413d461150458
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:40:57 2018 +0200

    [FLINK-9061] [fs] Add entropy injector for file systems
---
 .../flink/core/fs/EntropyInjectingFileSystem.java  |  49 +++++++
 .../org/apache/flink/core/fs/EntropyInjector.java  | 135 +++++++++++++++++++
 .../apache/flink/core/fs/OutputStreamAndPath.java  |  47 +++++++
 .../java/org/apache/flink/util/StringUtils.java    |  33 +++++
 .../apache/flink/core/fs/EntropyInjectorTest.java  | 148 +++++++++++++++++++++
 .../org/apache/flink/util/StringUtilsTest.java     |  12 ++
 6 files changed, 424 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java
new file mode 100644
index 0000000..14a15be
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An interface to be implemented by a {@link FileSystem} that is aware of entropy injection.
+ *
+ * <p>Entropy injection is a technique to spread files/objects across more parallel shards of
+ * a distributed storage (typically object store) by adding random characters to the beginning
+ * of the path/key and hence spearing the keys across a wider domain of prefixes.
+ *
+ * <p>Entropy injection typically works by having a recognized marker string in paths
+ * and replacing that marker with random characters.
+ *
+ * <p>This interface is used in conjunction with the {@link EntropyInjector} (as a poor man's
+ * way to build a mix-in in Java).
+ */
+@PublicEvolving
+public interface EntropyInjectingFileSystem {
+
+	/**
+	 * Gets the marker string that represents the substring of a path to be replaced
+	 * by the entropy characters.
+	 */
+	String getEntropyInjectionKey();
+
+	/**
+	 * Creates a string with random entropy to be injected into a path.
+	 */
+	String generateEntropy();
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
new file mode 100644
index 0000000..5bcb618
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * This class offers utilities for entropy injection for FileSystems that implement
+ * {@link EntropyInjectingFileSystem}.
+ */
+@PublicEvolving
+public class EntropyInjector {
+
+	/**
+	 * Handles entropy injection across regular and entropy-aware file systems.
+	 *
+	 * <p>If the given file system is entropy-aware (a implements {@link EntropyInjectingFileSystem}),
+	 * then this method replaces the entropy marker in the path with random characters.
+	 * The entropy marker is defined by {@link EntropyInjectingFileSystem#getEntropyInjectionKey()}.
+	 *
+	 * <p>If the given file system does not implement {@code EntropyInjectingFileSystem},
+	 * then this method delegates to {@link FileSystem#create(Path, WriteMode)} and
+	 * returns the same path in the resulting {@code OutputStreamAndPath}.
+	 */
+	public static OutputStreamAndPath createEntropyAware(
+			FileSystem fs,
+			Path path,
+			WriteMode writeMode) throws IOException {
+
+		if (!(fs instanceof EntropyInjectingFileSystem)) {
+			return new OutputStreamAndPath(fs.create(path, writeMode), path);
+		}
+
+		final EntropyInjectingFileSystem efs = (EntropyInjectingFileSystem) fs;
+		final Path pathWithEntropy = resolveEntropy(path, efs, true);
+
+		final FSDataOutputStream out = fs.create(pathWithEntropy, writeMode);
+		return new OutputStreamAndPath(out, pathWithEntropy);
+	}
+
+	/**
+	 * Removes the entropy marker string from the path, if the given file system is an
+	 * entropy-injecting file system (implements {@link EntropyInjectingFileSystem}) and
+	 * the entropy marker key is present. Otherwise, this returns the path as is.
+	 *
+	 * @param path The path to filter.
+	 * @return The path without the marker string.
+	 */
+	public static Path removeEntropyMarkerIfPresent(FileSystem fs, Path path) {
+		if (fs instanceof EntropyInjectingFileSystem) {
+			try {
+				return resolveEntropy(path, (EntropyInjectingFileSystem) fs, false);
+			}
+			catch (IOException e) {
+				// this should never happen, because the path was valid before. rethrow to silence the compiler
+				throw new FlinkRuntimeException(e.getMessage(), e);
+			}
+		}
+		else {
+			return path;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	static Path resolveEntropy(Path path, EntropyInjectingFileSystem efs, boolean injectEntropy) throws IOException {
+		final String entropyInjectionKey = efs.getEntropyInjectionKey();
+
+		if (entropyInjectionKey == null) {
+			return path;
+		}
+		else {
+			final URI originalUri = path.toUri();
+			final String checkpointPath = originalUri.getPath();
+
+			final int indexOfKey = checkpointPath.indexOf(entropyInjectionKey);
+			if (indexOfKey == -1) {
+				return path;
+			}
+			else {
+				final StringBuilder buffer = new StringBuilder(checkpointPath.length());
+				buffer.append(checkpointPath, 0, indexOfKey);
+
+				if (injectEntropy) {
+					buffer.append(efs.generateEntropy());
+				}
+
+				buffer.append(checkpointPath, indexOfKey + entropyInjectionKey.length(), checkpointPath.length());
+
+				final String rewrittenPath = buffer.toString();
+				try {
+					return new Path(new URI(
+							originalUri.getScheme(),
+							originalUri.getAuthority(),
+							rewrittenPath,
+							originalUri.getQuery(),
+							originalUri.getFragment()).normalize());
+				}
+				catch (URISyntaxException e) {
+					// this could only happen if the injected entropy string contains invalid characters
+					throw new IOException("URI format error while processing path for entropy injection", e);
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated. */
+	private EntropyInjector() {}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java b/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java
new file mode 100644
index 0000000..62c9479
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An output stream and a path.
+ */
+public final class OutputStreamAndPath {
+
+	private final FSDataOutputStream stream;
+
+	private final Path path;
+
+	/**
+	 * Creates a OutputStreamAndPath.
+	 */
+	public OutputStreamAndPath(FSDataOutputStream stream, Path path) {
+		this.stream = checkNotNull(stream);
+		this.path = checkNotNull(path);
+	}
+
+	public FSDataOutputStream stream() {
+		return stream;
+	}
+
+	public Path path() {
+		return path;
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index b9ff319..a968a8a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -32,6 +32,7 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -249,6 +250,38 @@ public final class StringUtils {
 	}
 
 	/**
+	 * Creates a random alphanumeric string of given length.
+	 *
+	 * @param rnd The random number generator to use.
+	 * @param length The number of alphanumeric characters to append.
+	 */
+	public static String generateRandomAlphanumericString(Random rnd, int length) {
+		checkNotNull(rnd);
+		checkArgument(length >= 0);
+
+		StringBuilder buffer = new StringBuilder(length);
+		for (int i = 0; i < length; i++) {
+			buffer.append(nextAlphanumericChar(rnd));
+		}
+		return buffer.toString();
+	}
+
+	private static char nextAlphanumericChar(Random rnd) {
+		int which = rnd.nextInt(62);
+		char c;
+		if (which < 10) {
+			c = (char) ('0' + which);
+		}
+		else if (which < 36) {
+			c = (char) ('A' - 10 + which);
+		}
+		else {
+			c = (char) ('a' - 36 + which);
+		}
+		return c;
+	}
+
+	/**
 	 * Writes a String to the given output.
 	 * The written string can be read with {@link #readString(DataInputView)}.
 	 *
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
new file mode 100644
index 0000000..a425826
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link EntropyInjector}.
+ */
+public class EntropyInjectorTest {
+
+	@ClassRule
+	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testEmptyPath() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("test", "ignored");
+		Path path = new Path("hdfs://localhost:12345");
+
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testFullUriNonMatching() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "ignored");
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/to/the/file");
+
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testFullUriMatching() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("s0mek3y", "12345678");
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+
+		assertEquals(new Path("s3://hugo@myawesomehost:55522/path/12345678/the/file"), EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(new Path("s3://hugo@myawesomehost:55522/path/the/file"), EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testPathOnlyNonMatching() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "ignored");
+		Path path = new Path("/path/file");
+
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testPathOnlyMatching() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "xyzz");
+		Path path = new Path("/path/_entropy_key_/file");
+
+		assertEquals(new Path("/path/xyzz/file"), EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(new Path("/path/file"), EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testEntropyNotFullSegment() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "pqr");
+		Path path = new Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
+
+		assertEquals(new Path("s3://myhost:122/entropy-pqr-suffix/file"), EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(new Path("s3://myhost:122/entropy--suffix/file"), EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testCreateEntropyAwarePlainFs() throws Exception {
+		File folder = TMP_FOLDER.newFolder();
+		Path path = new Path(Path.fromLocalFile(folder), "_entropy_/file");
+
+		OutputStreamAndPath out = EntropyInjector.createEntropyAware(
+				LocalFileSystem.getSharedInstance(), path, WriteMode.NO_OVERWRITE);
+
+		out.stream().close();
+
+		assertEquals(path, out.path());
+		assertTrue(new File (new File(folder, "_entropy_"), "file").exists());
+	}
+
+	@Test
+	public void testCreateEntropyAwareEntropyFs() throws Exception {
+		File folder = TMP_FOLDER.newFolder();
+		Path path = new Path(Path.fromLocalFile(folder), "_entropy_/file");
+		Path pathWithEntropy = new Path(Path.fromLocalFile(folder), "test-entropy/file");
+
+		FileSystem fs = new TestEntropyInjectingFs("_entropy_", "test-entropy");
+
+		OutputStreamAndPath out = EntropyInjector.createEntropyAware(fs, path, WriteMode.NO_OVERWRITE);
+
+		out.stream().close();
+
+		assertEquals(new Path(Path.fromLocalFile(folder), "test-entropy/file"), out.path());
+		assertTrue(new File (new File(folder, "test-entropy"), "file").exists());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestEntropyInjectingFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+
+		private final String key;
+
+		private final String entropy;
+
+		TestEntropyInjectingFs(String key, String entropy) {
+			this.key = key;
+			this.entropy = entropy;
+		}
+
+		@Override
+		public String getEntropyInjectionKey() {
+			return key;
+		}
+
+		@Override
+		public String generateEntropy() {
+			return entropy;
+		}
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 5f705b4..7aa8e6e 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.util;
 
 import org.junit.Test;
 
+import java.util.Random;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link StringUtils}.
@@ -56,4 +59,13 @@ public class StringUtilsTest extends TestLogger {
 		String hex = StringUtils.byteToHexString(byteArray);
 		assertEquals("019f314a", hex);
 	}
+
+	@Test
+	public void testGenerateAlphanumeric() {
+		String str = StringUtils.generateRandomAlphanumericString(new Random(), 256);
+
+		if (!str.matches("[a-zA-Z0-9]{256}")) {
+			fail("Not alphanumeric: " + str);
+		}
+	}
 }