You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/25 06:52:03 UTC
[flink] 05/06: [FLINK-9061] [s3] Make S3 Presto file system entropy
injecting
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a2b2041bf3885ee16d0333b4f153a12d435edff4
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 22:11:09 2018 +0200
[FLINK-9061] [s3] Make S3 Presto file system entropy injecting
---
.../flink/fs/s3presto/S3FileSystemFactory.java | 103 ++++++++++++++++++---
.../flink/fs/s3presto/S3PrestoFileSystem.java | 94 +++++++++++++++++++
.../flink/fs/s3presto/PrestoS3EntropyTest.java | 47 ++++++++++
3 files changed, 231 insertions(+), 13 deletions(-)
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index a04f9c9..6cf0267 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,12 +19,20 @@
package org.apache.flink.fs.s3presto;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
@@ -34,7 +42,35 @@ import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory implements FileSystemFactory {
+
+ /**
+ * The substring to be replaced by random entropy in checkpoint paths.
+ */
+ public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
+ .key("s3.entropy.key")
+ .noDefaultValue()
+ .withDescription(
+ "This option can be used to improve performance due to sharding issues on Amazon S3. " +
+ "For file creations with entropy injection, this key will be replaced by random " +
+ "alphanumeric characters. For other file creations, the key will be filtered out.");
+
+ /**
+ * The number of entropy characters, in case entropy injection is configured.
+ */
+ public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
+ .key("s3.entropy.length")
+ .defaultValue(4)
+ .withDescription(
+ "When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
+ "random characters to replace the entropy key with.");
+
+ // ------------------------------------------------------------------------
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
+
+ private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
+
private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
new HashSet<>(Collections.singletonList("com.amazonaws."));
@@ -50,28 +86,69 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
{ "presto.s3.secret.key", "presto.s3.secret-key" }
};
- public S3FileSystemFactory() {
- super("Presto S3 File System", createHadoopConfigLoader());
- }
+ // ------------------------------------------------------------------------
+
+ private final HadoopConfigLoader hadoopConfigLoader = createHadoopConfigLoader();
+
+ private Configuration flinkConfig;
@Override
public String getScheme() {
return "s3";
}
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ hadoopConfigLoader.setFlinkConfig(config);
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ Configuration flinkConfig = this.flinkConfig;
+
+ if (flinkConfig == null) {
+ LOG.warn("Creating S3 Presto FileSystem without configuring the factory. All behavior will be default.");
+ flinkConfig = new Configuration();
+ }
+
+ LOG.debug("Creating S3 file system backed by PrestoS3FileSystem");
+
+ try {
+ org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
+ PrestoS3FileSystem fs = new PrestoS3FileSystem();
+ fs.initialize(createInitUri(fsUri), hadoopConfig);
+
+ // load the entropy injection settings
+ String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
+ int numEntropyChars = -1;
+ if (entropyInjectionKey != null) {
+ if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
+ throw new IllegalConfigurationException("Invalid character in value for " +
+ ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
+ }
+ numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
+ if (numEntropyChars <= 0) {
+ throw new IllegalConfigurationException(
+ ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
+ }
+ }
+
+ return new S3PrestoFileSystem(fs, entropyInjectionKey, numEntropyChars);
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
@VisibleForTesting
static HadoopConfigLoader createHadoopConfigLoader() {
return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
"presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
}
- @Override
- protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
- return new PrestoS3FileSystem();
- }
-
- @Override
- protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+ private static URI createInitUri(URI fsUri) {
final String scheme = fsUri.getScheme();
final String authority = fsUri.getAuthority();
final URI initUri;
@@ -88,7 +165,7 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
return initUri;
}
- private URI createURI(String str) {
+ private static URI createURI(String str) {
try {
return new URI(str);
} catch (URISyntaxException e) {
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
new file mode 100644
index 0000000..b901063
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.StringUtils;
+
+import com.facebook.presto.hive.PrestoS3FileSystem;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystem} interface for S3.
+ * This class implements the common behavior implemented directly by Flink and delegates
+ * common calls to an implementation of Hadoop's filesystem abstraction.
+ */
+public class S3PrestoFileSystem extends HadoopFileSystem implements EntropyInjectingFileSystem {
+
+ @Nullable
+ private final String entropyInjectionKey;
+
+ private final int entropyLength;
+
+ /**
+ * Creates a S3PrestoFileSystem based on the given Presto S3 file system.
+ * The given Hadoop file system object is expected to be initialized already.
+ */
+ public S3PrestoFileSystem(PrestoS3FileSystem prestoFs) {
+ this(prestoFs, null, -1);
+ }
+
+ /**
+ * Creates a FlinkS3FileSystem based on the given Hadoop S3 file system.
+ * The given Hadoop file system object is expected to be initialized already.
+ *
+ * <p>This constructor additionally configures the entropy injection for the file system.
+ *
+ * @param prestoFs The Presto S3 File System that will be used under the hood.
+ * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
+ * @param entropyLength The number of random alphanumeric characters to inject as entropy.
+ */
+ public S3PrestoFileSystem(
+ PrestoS3FileSystem prestoFs,
+ @Nullable String entropyInjectionKey,
+ int entropyLength) {
+
+ super(prestoFs);
+
+ if (entropyInjectionKey != null && entropyLength <= 0) {
+ throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
+ }
+
+ this.entropyInjectionKey = entropyInjectionKey;
+ this.entropyLength = entropyLength;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Nullable
+ @Override
+ public String getEntropyInjectionKey() {
+ return entropyInjectionKey;
+ }
+
+ @Override
+ public String generateEntropy() {
+ return StringUtils.generateRandomAlphanumericString(ThreadLocalRandom.current(), entropyLength);
+ }
+
+ @Override
+ public FileSystemKind getKind() {
+ return FileSystemKind.OBJECT_STORE;
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3EntropyTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3EntropyTest.java
new file mode 100644
index 0000000..02a0bd5
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3EntropyTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests that the file system factory picks up the entropy configuration properly.
+ */
+public class PrestoS3EntropyTest {
+
+ @Test
+ public void testEntropyInjectionConfig() throws Exception {
+ final Configuration conf = new Configuration();
+ conf.setString("s3.entropy.key", "__entropy__");
+ conf.setInteger("s3.entropy.length", 7);
+
+ S3FileSystemFactory factory = new S3FileSystemFactory();
+ factory.configure(conf);
+
+ S3PrestoFileSystem fs = (S3PrestoFileSystem) factory.create(new URI("s3://test"));
+ assertEquals("__entropy__", fs.getEntropyInjectionKey());
+ assertEquals(7, fs.generateEntropy().length());
+ }
+}