You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/25 06:52:03 UTC

[flink] 05/06: [FLINK-9061] [s3] Make S3 Presto file system entropy injecting

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2b2041bf3885ee16d0333b4f153a12d435edff4
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 22:11:09 2018 +0200

    [FLINK-9061] [s3] Make S3 Presto file system entropy injecting
---
 .../flink/fs/s3presto/S3FileSystemFactory.java     | 103 ++++++++++++++++++---
 .../flink/fs/s3presto/S3PrestoFileSystem.java      |  94 +++++++++++++++++++
 .../flink/fs/s3presto/PrestoS3EntropyTest.java     |  47 ++++++++++
 3 files changed, 231 insertions(+), 13 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index a04f9c9..6cf0267 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,12 +19,20 @@
 package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -34,7 +42,35 @@ import java.util.Set;
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory implements FileSystemFactory {
+
+	/**
+	 * The substring to be replaced by random entropy in checkpoint paths.
+	 */
+	public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
+			.key("s3.entropy.key")
+			.noDefaultValue()
+			.withDescription(
+					"This option can be used to improve performance due to sharding issues on Amazon S3. " +
+					"For file creations with entropy injection, this key will be replaced by random " +
+					"alphanumeric characters. For other file creations, the key will be filtered out.");
+
+	/**
+	 * The number of entropy characters, in case entropy injection is configured.
+	 */
+	public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
+			.key("s3.entropy.length")
+			.defaultValue(4)
+			.withDescription(
+					"When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
+					"random characters to replace the entropy key with.");
+
+	// ------------------------------------------------------------------------
+
+	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
+
+	private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
+
 	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
 		new HashSet<>(Collections.singletonList("com.amazonaws."));
 
@@ -50,28 +86,69 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
 			{ "presto.s3.secret.key", "presto.s3.secret-key" }
 	};
 
-	public S3FileSystemFactory() {
-		super("Presto S3 File System", createHadoopConfigLoader());
-	}
+	// ------------------------------------------------------------------------
+
+	private final HadoopConfigLoader hadoopConfigLoader = createHadoopConfigLoader();
+
+	private Configuration flinkConfig;
 
 	@Override
 	public String getScheme() {
 		return "s3";
 	}
 
+	@Override
+	public void configure(Configuration config) {
+		flinkConfig = config;
+		hadoopConfigLoader.setFlinkConfig(config);
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		Configuration flinkConfig = this.flinkConfig;
+
+		if (flinkConfig == null) {
+			LOG.warn("Creating S3 Presto FileSystem without configuring the factory. All behavior will be default.");
+			flinkConfig = new Configuration();
+		}
+
+		LOG.debug("Creating S3 file system backed by PrestoS3FileSystem");
+
+		try {
+			org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
+			PrestoS3FileSystem fs = new PrestoS3FileSystem();
+			fs.initialize(createInitUri(fsUri), hadoopConfig);
+
+			// load the entropy injection settings
+			String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
+			int numEntropyChars = -1;
+			if (entropyInjectionKey != null) {
+				if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
+					throw new IllegalConfigurationException("Invalid character in value for " +
+							ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
+				}
+				numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
+				if (numEntropyChars <= 0) {
+					throw new IllegalConfigurationException(
+							ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
+				}
+			}
+
+			return new S3PrestoFileSystem(fs, entropyInjectionKey, numEntropyChars);
+		} catch (IOException ioe) {
+			throw ioe;
+		} catch (Exception e) {
+			throw new IOException(e.getMessage(), e);
+		}
+	}
+
 	@VisibleForTesting
 	static HadoopConfigLoader createHadoopConfigLoader() {
 		return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
 			"presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
 	}
 
-	@Override
-	protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
-		return new PrestoS3FileSystem();
-	}
-
-	@Override
-	protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+	private static URI createInitUri(URI fsUri) {
 		final String scheme = fsUri.getScheme();
 		final String authority = fsUri.getAuthority();
 		final URI initUri;
@@ -88,7 +165,7 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
 		return initUri;
 	}
 
-	private URI createURI(String str) {
+	private static URI createURI(String str) {
 		try {
 			return new URI(str);
 		} catch (URISyntaxException e) {
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
new file mode 100644
index 0000000..b901063
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.StringUtils;
+
+import com.facebook.presto.hive.PrestoS3FileSystem;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystem} interface for S3.
+ * This class implements the common behavior implemented directly by Flink and delegates
+ * common calls to an implementation of Hadoop's filesystem abstraction.
+ */
+public class S3PrestoFileSystem extends HadoopFileSystem implements EntropyInjectingFileSystem {
+
+	@Nullable
+	private final String entropyInjectionKey;
+
+	private final int entropyLength;
+
+	/**
+	 * Creates a S3PrestoFileSystem based on the given Presto S3 file system.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 */
+	public S3PrestoFileSystem(PrestoS3FileSystem prestoFs) {
+		this(prestoFs, null, -1);
+	}
+
+	/**
+	 * Creates a FlinkS3FileSystem based on the given Hadoop S3 file system.
+	 * The given Hadoop file system object is expected to be initialized already.
+	 *
+	 * <p>This constructor additionally configures the entropy injection for the file system.
+	 *
+	 * @param prestoFs The Presto S3 File System that will be used under the hood.
+	 * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
+	 * @param entropyLength The number of random alphanumeric characters to inject as entropy.
+	 */
+	public S3PrestoFileSystem(
+			PrestoS3FileSystem prestoFs,
+			@Nullable String entropyInjectionKey,
+			int entropyLength) {
+
+		super(prestoFs);
+
+		if (entropyInjectionKey != null && entropyLength <= 0) {
+			throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
+		}
+
+		this.entropyInjectionKey = entropyInjectionKey;
+		this.entropyLength = entropyLength;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public String getEntropyInjectionKey() {
+		return entropyInjectionKey;
+	}
+
+	@Override
+	public String generateEntropy() {
+		return StringUtils.generateRandomAlphanumericString(ThreadLocalRandom.current(), entropyLength);
+	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.OBJECT_STORE;
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3EntropyTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3EntropyTest.java
new file mode 100644
index 0000000..02a0bd5
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3EntropyTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests that the file system factory picks up the entropy configuration properly.
+ */
+public class PrestoS3EntropyTest {
+
+	@Test
+	public void testEntropyInjectionConfig() throws Exception {
+		final Configuration conf = new Configuration();
+		conf.setString("s3.entropy.key", "__entropy__");
+		conf.setInteger("s3.entropy.length", 7);
+
+		S3FileSystemFactory factory = new S3FileSystemFactory();
+		factory.configure(conf);
+
+		S3PrestoFileSystem fs = (S3PrestoFileSystem) factory.create(new URI("s3://test"));
+		assertEquals("__entropy__", fs.getEntropyInjectionKey());
+		assertEquals(7, fs.generateEntropy().length());
+	}
+}