You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/24 21:56:52 UTC

[flink] 04/05: [FLINK-9061] [s3] Make base S3 file system entropy injecting

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8a9c72ed675669363e6c7d7499abee30fc19b88
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:46:37 2018 +0200

    [FLINK-9061] [s3] Make base S3 file system entropy injecting
---
 docs/ops/filesystems.md                            | 30 +++++++-
 .../fs/s3/common/AbstractS3FileSystemFactory.java  | 71 +++++++++++++++++--
 .../flink/fs/s3/common/FlinkS3FileSystem.java      | 59 +++++++++++++++-
 .../flink/fs/s3/common/HadoopConfigLoader.java     |  1 -
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java | 79 ++++++++++++++++++++++
 .../src/test/resources/log4j-test.properties       | 27 ++++++++
 6 files changed, 258 insertions(+), 9 deletions(-)

diff --git a/docs/ops/filesystems.md b/docs/ops/filesystems.md
index 78f2c96..416302e 100644
--- a/docs/ops/filesystems.md
+++ b/docs/ops/filesystems.md
@@ -24,7 +24,7 @@ under the License.
 
 This page provides details on setting up and configuring distributed file systems for use with Flink.
 
-## Flink' File System support
+## Flink's File System support
 
 Flink uses file systems both as a source and sink in streaming/batch applications, and as a target for checkpointing.
 These file systems can for example be *Unix/Windows file systems*, *HDFS*, or even object stores like *S3*.
@@ -122,6 +122,34 @@ These limits are enforced per TaskManager, so each TaskManager in a Flink applic
 In addition, the limits are also only enforced per FileSystem instance. Because File Systems are created per scheme and authority, different
 authorities will have their own connection pool. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
 
+## Entropy injection for S3 file systems
+
+The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) support entropy injection. Entropy injection is
+a technique to improve scalability of AWS S3 buckets through adding some random characters near the beginning of the key.
+
+If entropy injection is activated, a configured substring in the paths will be replaced by random characters. For example, path
+`s3://my-bucket/checkpoints/_entropy_/dashboard-job/` would be replaced by something like `s3://my-bucket/checkpoints/gf36ikvg/dashboard-job/`.
+
+**Note that this only happens when the file creation passes the option to inject entropy!**, otherwise the file path will
+simply remove the entropy key substring. See
+[FileSystem.create(Path, WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-)
+for details.
+
+*Note: The Flink runtime currently passes the option to inject entropy only to checkpoint data files.*
+*All other files, including checkpoint metadata and external URI do not inject entropy, to keep checkpoint URIs predictable.*
+
+To enable entropy injection, configure the *entropy key* and the *entropy length* parameters.
+
+```
+s3.entropy.key: _entropy_
+s3.entropy.length: 4 (default)
+
+```
+
+The `s3.entropy.key` defines the string in paths that is replaced by the random characters. Paths that do not contain the entropy key are left unchanged.
+If a file system operation does not pass the *"inject entropy"* write option, the entropy key substring is simply removed.
+The `s3.entropy.length` defined the number of random alphanumeric characters to replace the entropy key with.
+
 ## Adding new File System Implementations
 
 File system implementations are discovered by Flink through Java's service abstraction, making it easy to add additional file system implementations.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index ed24138..58144f0 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.fs.s3.common;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 
@@ -28,9 +31,36 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 
-/** Base class for Hadoop file system factories. */
+/**
+ * Base class for file system factories that create S3 file systems.
+ */
 public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 
+	/**
+	 * The substring to be replaced by random entropy in checkpoint paths.
+	 */
+	public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
+			.key("s3.entropy.key")
+			.noDefaultValue()
+			.withDescription(
+					"This option can be used to improve performance due to sharding issues on Amazon S3. " +
+					"For file creations with entropy injection, this key will be replaced by random " +
+					"alphanumeric characters. For other file creations, the key will be filtered out.");
+
+	/**
+	 * The number of entropy characters, in case entropy injection is configured.
+	 */
+	public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
+			.key("s3.entropy.length")
+			.defaultValue(4)
+			.withDescription(
+					"When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
+					"random characters to replace the entropy key with.");
+
+	// ------------------------------------------------------------------------
+
+	private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
+
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractS3FileSystemFactory.class);
 
 	/** Name of this factory for logging. */
@@ -38,29 +68,60 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 
 	private final HadoopConfigLoader hadoopConfigLoader;
 
+	private Configuration flinkConfig;
+
 	protected AbstractS3FileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
 		this.name = name;
 		this.hadoopConfigLoader = hadoopConfigLoader;
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
 	public void configure(Configuration config) {
+		flinkConfig = config;
 		hadoopConfigLoader.setFlinkConfig(config);
 	}
 
 	@Override
 	public FileSystem create(URI fsUri) throws IOException {
+		Configuration flinkConfig = this.flinkConfig;
+
+		if (flinkConfig == null) {
+			LOG.warn("Creating S3 FileSystem without configuring the factory. All behavior will be default.");
+			flinkConfig = new Configuration();
+		}
+
 		LOG.debug("Creating S3 file system backed by {}", name);
 		LOG.debug("Loading Hadoop configuration for {}", name);
 
 		try {
+			// create the Hadoop FileSystem
 			org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
 			org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
 			fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
-			return new FlinkS3FileSystem(fs);
-		} catch (IOException ioe) {
-			throw ioe;
-		} catch (Exception e) {
+
+			// load the entropy injection settings
+			String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
+			int numEntropyChars = -1;
+			if (entropyInjectionKey != null) {
+				if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
+					throw new IllegalConfigurationException("Invalid character in value for " +
+							ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
+				}
+				numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
+				if (numEntropyChars <= 0) {
+					throw new IllegalConfigurationException(
+							ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
+				}
+			}
+
+			return new FlinkS3FileSystem(fs, entropyInjectionKey, numEntropyChars);
+		}
+		catch (IOException e) {
+			throw e;
+		}
+		catch (Exception e) {
 			throw new IOException(e.getMessage(), e);
 		}
 	}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index a3d960a..84883bb 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -18,22 +18,77 @@
 
 package org.apache.flink.fs.s3.common;
 
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystem} interface for S3.
  * This class implements the common behavior implemented directly by Flink and delegates
  * common calls to an implementation of Hadoop's filesystem abstraction.
  */
-public class FlinkS3FileSystem extends HadoopFileSystem {
+public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInjectingFileSystem {
+
+	@Nullable
+	private final String entropyInjectionKey;
+
+	private final int entropyLength;
 
 	/**
-	 * Wraps the given Hadoop S3 File System object as a Flink S3 File System object.
+	 * Creates a FlinkS3FileSystem based on the given Hadoop S3 file system.
 	 * The given Hadoop file system object is expected to be initialized already.
 	 *
 	 * @param hadoopS3FileSystem The Hadoop FileSystem that will be used under the hood.
 	 */
 	public FlinkS3FileSystem(org.apache.hadoop.fs.FileSystem hadoopS3FileSystem) {
+		this(hadoopS3FileSystem, null, -1);
+	}
+
+	/**
+	 * Creates a FlinkS3FileSystem based on the given Hadoop S3 file system.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * <p>This constructor additionally configures the entropy injection for the file system.
+	 *
+	 * @param hadoopS3FileSystem The Hadoop FileSystem that will be used under the hood.
+	 * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
+	 * @param entropyLength The number of random alphanumeric characters to inject as entropy.
+	 */
+	public FlinkS3FileSystem(
+			org.apache.hadoop.fs.FileSystem hadoopS3FileSystem,
+			@Nullable String entropyInjectionKey,
+			int entropyLength) {
+
 		super(hadoopS3FileSystem);
+
+		if (entropyInjectionKey != null && entropyLength <= 0) {
+			throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
+		}
+
+		this.entropyInjectionKey = entropyInjectionKey;
+		this.entropyLength = entropyLength;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public String getEntropyInjectionKey() {
+		return entropyInjectionKey;
+	}
+
+	@Override
+	public String generateEntropy() {
+		return StringUtils.generateRandomAlphanumericString(ThreadLocalRandom.current(), entropyLength);
+	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.OBJECT_STORE;
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
index 5ca497c..1bbb757 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java
@@ -97,7 +97,6 @@ public class HadoopConfigLoader {
 		for (String key : flinkConfig.keySet()) {
 			for (String prefix : flinkConfigPrefixes) {
 				if (key.startsWith(prefix)) {
-					String value = flinkConfig.getString(key, null);
 					String newKey = hadoopConfigPrefix + key.substring(prefix.length());
 					String newValue = fixHadoopConfig(key, flinkConfig.getString(key, null));
 					hadoopConfig.set(newKey, newValue);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
new file mode 100644
index 0000000..832bbf4
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3.common;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.net.URI;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests that the file system factory picks up the entropy configuration properly.
+ */
+public class S3EntropyFsFactoryTest {
+
+	@Test
+	public void testEntropyInjectionConfig() throws Exception {
+		final Configuration conf = new Configuration();
+		conf.setString("s3.entropy.key", "__entropy__");
+		conf.setInteger("s3.entropy.length", 7);
+
+		TestFsFactory factory = new TestFsFactory();
+		factory.configure(conf);
+
+		FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
+		assertEquals("__entropy__", fs.getEntropyInjectionKey());
+		assertEquals(7, fs.generateEntropy().length());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestFsFactory extends AbstractS3FileSystemFactory {
+
+		TestFsFactory() {
+			super("testFs", new HadoopConfigLoader(
+					new String[0],
+					new String[0][],
+					"",
+					Collections.emptySet(),
+					Collections.emptySet(),
+					""));
+		}
+
+		@Override
+		protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+			return Mockito.mock(org.apache.hadoop.fs.FileSystem.class);
+		}
+
+		@Override
+		protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+			return fsUri;
+		}
+
+		@Override
+		public String getScheme() {
+			return "test";
+		}
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/resources/log4j-test.properties b/flink-filesystems/flink-s3-fs-base/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2be3589
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n