You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/07/27 11:52:48 UTC
[flink] branch release-1.6 updated: [FLINK-8439] Add Flink shading
to AWS credential provider s3 hadoop config
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 76bc0e9 [FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop config
76bc0e9 is described below
commit 76bc0e96f58f450a2b96c240290fddb269931f06
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 23 18:10:55 2018 +0200
[FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop config
---
.../runtime/fs/hdfs/AbstractFileSystemFactory.java | 65 ++++++++++
.../flink/runtime/fs/hdfs/HadoopConfigLoader.java | 132 ++++++++++++++++++++
.../flink/fs/s3hadoop/S3FileSystemFactory.java | 130 ++++++-------------
.../flink/fs/s3hadoop/HadoopS3FileSystemTest.java | 44 +++++++
.../flink/fs/s3presto/S3FileSystemFactory.java | 137 +++++++--------------
.../flink/fs/s3presto/PrestoS3FileSystemTest.java | 15 +++
6 files changed, 342 insertions(+), 181 deletions(-)
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
new file mode 100644
index 0000000..201d580
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** Base class for Hadoop file system factories. */
+public abstract class AbstractFileSystemFactory implements FileSystemFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSystemFactory.class);
+
+ /** Name of this factory for logging. */
+ private final String name;
+
+ private final HadoopConfigLoader hadoopConfigLoader;
+
+ protected AbstractFileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
+ this.name = name;
+ this.hadoopConfigLoader = hadoopConfigLoader;
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ hadoopConfigLoader.setFlinkConfig(config);
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ LOG.debug("Creating Hadoop file system (backed by " + name + ")");
+ LOG.debug("Loading Hadoop configuration for " + name);
+ org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
+ org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
+ fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
+ return new HadoopFileSystem(fs);
+ }
+
+ protected abstract org.apache.hadoop.fs.FileSystem createHadoopFileSystem();
+
+ protected abstract URI getInitURI(
+ URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
+}
+
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
new file mode 100644
index 0000000..a40e24f
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.Set;
+
+/** This class lazily loads hadoop configuration from resettable Flink's configuration. */
+public class HadoopConfigLoader {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopConfigLoader.class);
+
+ /** The prefixes that Flink adds to the Hadoop fs config. */
+ private final String[] flinkConfigPrefixes;
+
+ /** Keys that are replaced (after prefix replacement, to give a more uniform experience
+ * across different file system implementations. */
+ private final String[][] mirroredConfigKeys;
+
+ /** Hadoop config prefix to replace Flink prefix. */
+ private final String hadoopConfigPrefix;
+
+ private final Set<String> packagePrefixesToShade;
+ private final Set<String> configKeysToShade;
+ private final String flinkShadingPrefix;
+
+ /** Flink's configuration object. */
+ private Configuration flinkConfig;
+
+ /** Hadoop's configuration for the file systems, lazily initialized. */
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ public HadoopConfigLoader(
+ @Nonnull String[] flinkConfigPrefixes,
+ @Nonnull String[][] mirroredConfigKeys,
+ @Nonnull String hadoopConfigPrefix,
+ Set<String> packagePrefixesToShade,
+ @Nonnull Set<String> configKeysToShade,
+ @Nonnull String flinkShadingPrefix) {
+ this.flinkConfigPrefixes = flinkConfigPrefixes;
+ this.mirroredConfigKeys = mirroredConfigKeys;
+ this.hadoopConfigPrefix = hadoopConfigPrefix;
+ this.packagePrefixesToShade = packagePrefixesToShade;
+ this.configKeysToShade = configKeysToShade;
+ this.flinkShadingPrefix = flinkShadingPrefix;
+ }
+
+ public void setFlinkConfig(Configuration config) {
+ flinkConfig = config;
+ hadoopConfig = null;
+ }
+
+ /** get the loaded Hadoop config (or fall back to one loaded from the classpath). */
+ public org.apache.hadoop.conf.Configuration getOrLoadHadoopConfig() {
+ org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
+ if (hadoopConfig == null) {
+ if (flinkConfig != null) {
+ hadoopConfig = mirrorCertainHadoopConfig(loadHadoopConfigFromFlink());
+ }
+ else {
+ LOG.warn("Flink configuration is not set prior to loading this configuration."
+ + " Using Hadoop configuration from the classpath.");
+ hadoopConfig = new org.apache.hadoop.conf.Configuration();
+ }
+ }
+ this.hadoopConfig = hadoopConfig;
+ return hadoopConfig;
+ }
+
+ // add additional config entries from the Flink config to the Hadoop config
+ private org.apache.hadoop.conf.Configuration loadHadoopConfigFromFlink() {
+ org.apache.hadoop.conf.Configuration hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+ for (String key : flinkConfig.keySet()) {
+ for (String prefix : flinkConfigPrefixes) {
+ if (key.startsWith(prefix)) {
+ String value = flinkConfig.getString(key, null);
+ String newKey = hadoopConfigPrefix + key.substring(prefix.length());
+ String newValue = fixHadoopConfig(key, flinkConfig.getString(key, null));
+ hadoopConfig.set(newKey, newValue);
+
+ LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config", key, newKey, value);
+ }
+ }
+ }
+ return hadoopConfig;
+ }
+
+ // mirror certain keys to make use more uniform across implementations
+ // with different keys
+ private org.apache.hadoop.conf.Configuration mirrorCertainHadoopConfig(
+ org.apache.hadoop.conf.Configuration hadoopConfig) {
+ for (String[] mirrored : mirroredConfigKeys) {
+ String value = hadoopConfig.get(mirrored[0], null);
+ if (value != null) {
+ hadoopConfig.set(mirrored[1], value);
+ }
+ }
+ return hadoopConfig;
+ }
+
+ private String fixHadoopConfig(String key, String value) {
+ return key != null && configKeysToShade.contains(key) ?
+ shadeClassConfig(value) : value;
+ }
+
+ private String shadeClassConfig(String classConfig) {
+ return packagePrefixesToShade.stream().anyMatch(classConfig::startsWith) ?
+ flinkShadingPrefix + classConfig : classConfig;
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index bd272e5..4ba49bb 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -18,127 +18,77 @@
package org.apache.flink.fs.s3hadoop;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory implements FileSystemFactory {
-
+public class S3FileSystemFactory extends AbstractFileSystemFactory {
private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
- /** The prefixes that Flink adds to the Hadoop config under 'fs.s3a.'. */
- private static final String[] CONFIG_PREFIXES = { "s3.", "s3a.", "fs.s3a." };
+ private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
+ new HashSet<>(Collections.singletonList("com.amazonaws."));
+
+ private static final Set<String> CONFIG_KEYS_TO_SHADE =
+ Collections.unmodifiableSet(new HashSet<>(Collections.singleton("fs.s3a.aws.credentials.provider")));
+
+ private static final String FLINK_SHADING_PREFIX = "org.apache.flink.fs.s3hadoop.shaded.";
+
+ private static final String[] FLINK_CONFIG_PREFIXES = { "s3.", "s3a.", "fs.s3a." };
- /** Keys that are replaced (after prefix replacement, to give a more uniform experience
- * across different file system implementations. */
private static final String[][] MIRRORED_CONFIG_KEYS = {
{ "fs.s3a.access-key", "fs.s3a.access.key" },
{ "fs.s3a.secret-key", "fs.s3a.secret.key" }
};
- /** Flink's configuration object. */
- private Configuration flinkConfig;
-
- /** Hadoop's configuration for the file systems, lazily initialized. */
- private org.apache.hadoop.conf.Configuration hadoopConfig;
+ public S3FileSystemFactory() {
+ super("Hadoop s3a file system", createHadoopConfigLoader());
+ }
@Override
public String getScheme() {
return "s3";
}
- @Override
- public void configure(Configuration config) {
- flinkConfig = config;
- hadoopConfig = null;
+ @VisibleForTesting
+ static HadoopConfigLoader createHadoopConfigLoader() {
+ return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
+ "fs.s3a.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
}
@Override
- public FileSystem create(URI fsUri) throws IOException {
- LOG.debug("Creating S3 file system (backed by a Hadoop s3a file system)");
-
- try {
- // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
-
- org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
- if (hadoopConfig == null) {
- if (flinkConfig != null) {
- LOG.debug("Loading Hadoop configuration for s3a file system");
- hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
-
- // add additional config entries from the Flink config to the Presto Hadoop config
- for (String key : flinkConfig.keySet()) {
- for (String prefix : CONFIG_PREFIXES) {
- if (key.startsWith(prefix)) {
- String value = flinkConfig.getString(key, null);
- String newKey = "fs.s3a." + key.substring(prefix.length());
- hadoopConfig.set(newKey, flinkConfig.getString(key, null));
-
- LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " +
- "S3A File System", key, newKey, value);
- }
- }
- }
-
- // mirror certain keys to make use more uniform across s3 implementations
- // with different keys
- for (String[] mirrored : MIRRORED_CONFIG_KEYS) {
- String value = hadoopConfig.get(mirrored[0], null);
- if (value != null) {
- hadoopConfig.set(mirrored[1], value);
- }
- }
-
- this.hadoopConfig = hadoopConfig;
- }
- else {
- LOG.warn("The factory has not been configured prior to loading the S3 file system."
- + " Using Hadoop configuration from the classpath.");
-
- hadoopConfig = new org.apache.hadoop.conf.Configuration();
- this.hadoopConfig = hadoopConfig;
- }
- }
-
- // -- (2) Instantiate the Hadoop file system class for that scheme
+ protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+ return new S3AFileSystem();
+ }
- final String scheme = fsUri.getScheme();
- final String authority = fsUri.getAuthority();
+ @Override
+ protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+ final String scheme = fsUri.getScheme();
+ final String authority = fsUri.getAuthority();
- if (scheme == null && authority == null) {
- fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
- }
- else if (scheme != null && authority == null) {
- URI defaultUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
- if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
- fsUri = defaultUri;
- }
+ if (scheme == null && authority == null) {
+ fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+ }
+ else if (scheme != null && authority == null) {
+ URI defaultUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+ if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
+ fsUri = defaultUri;
}
+ }
- LOG.debug("Using scheme {} for s3a file system backing the S3 File System", fsUri);
-
- final S3AFileSystem fs = new S3AFileSystem();
- fs.initialize(fsUri, hadoopConfig);
+ LOG.debug("Using scheme {} for s3a file system backing the S3 File System", fsUri);
- return new HadoopFileSystem(fs);
- }
- catch (IOException e) {
- throw e;
- }
- catch (Exception e) {
- throw new IOException(e.getMessage(), e);
- }
+ return fsUri;
}
}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
new file mode 100644
index 0000000..647a937
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for the S3 file system support via Hadoop's {@link org.apache.hadoop.fs.s3a.S3AFileSystem}.
+ */
+public class HadoopS3FileSystemTest {
+ @Test
+ public void testShadingOfAwsCredProviderConfig() {
+ final Configuration conf = new Configuration();
+ conf.setString("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.ContainerCredentialsProvider");
+
+ HadoopConfigLoader configLoader = S3FileSystemFactory.createHadoopConfigLoader();
+ configLoader.setFlinkConfig(conf);
+
+ org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();
+ assertEquals("org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider",
+ hadoopConfig.get("fs.s3a.aws.credentials.provider"));
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index 8847dc9c..a04f9c9 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -18,126 +18,81 @@
package org.apache.flink.fs.s3presto;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.PrestoS3FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory implements FileSystemFactory {
+public class S3FileSystemFactory extends AbstractFileSystemFactory {
+ private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
+ new HashSet<>(Collections.singletonList("com.amazonaws."));
- private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
+ private static final Set<String> CONFIG_KEYS_TO_SHADE =
+ Collections.unmodifiableSet(new HashSet<>(Collections.singleton("presto.s3.credentials-provider")));
- /** The prefixes that Flink adds to the Hadoop config under 'presto.s3.'. */
- private static final String[] CONFIG_PREFIXES = { "s3.", "presto.s3." };
+ private static final String FLINK_SHADING_PREFIX = "org.apache.flink.fs.s3presto.shaded.";
+
+ private static final String[] FLINK_CONFIG_PREFIXES = { "s3.", "presto.s3." };
- /** Keys that are replaced (after prefix replacement, to give a more uniform experience
- * across different file system implementations. */
private static final String[][] MIRRORED_CONFIG_KEYS = {
{ "presto.s3.access.key", "presto.s3.access-key" },
{ "presto.s3.secret.key", "presto.s3.secret-key" }
};
- /** Flink's configuration object. */
- private Configuration flinkConfig;
-
- /** Hadoop's configuration for the file systems, lazily initialized. */
- private org.apache.hadoop.conf.Configuration hadoopConfig;
+ public S3FileSystemFactory() {
+ super("Presto S3 File System", createHadoopConfigLoader());
+ }
@Override
public String getScheme() {
return "s3";
}
+ @VisibleForTesting
+ static HadoopConfigLoader createHadoopConfigLoader() {
+ return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
+ "presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
+ }
+
@Override
- public void configure(Configuration config) {
- flinkConfig = config;
- hadoopConfig = null;
+ protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+ return new PrestoS3FileSystem();
}
@Override
- public FileSystem create(URI fsUri) throws IOException {
- LOG.debug("Creating S3 file system (backed by a Hadoop s3a file system");
+ protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+ final String scheme = fsUri.getScheme();
+ final String authority = fsUri.getAuthority();
+ final URI initUri;
- try {
- // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
-
- org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
- if (hadoopConfig == null) {
- if (flinkConfig != null) {
- LOG.debug("Loading Hadoop configuration for Presto S3 file system");
- hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
-
- // add additional config entries from the Flink config to the Presto Hadoop config
- for (String key : flinkConfig.keySet()) {
- for (String prefix : CONFIG_PREFIXES) {
- if (key.startsWith(prefix)) {
- String value = flinkConfig.getString(key, null);
- String newKey = "presto.s3." + key.substring(prefix.length());
- hadoopConfig.set(newKey, flinkConfig.getString(key, null));
-
- LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " +
- "Presto S3 File System", key, newKey, value);
- }
- }
- }
-
- // mirror certain keys to make use more uniform across s3 implementations
- // with different keys
- for (String[] mirrored : MIRRORED_CONFIG_KEYS) {
- String value = hadoopConfig.get(mirrored[0], null);
- if (value != null) {
- hadoopConfig.set(mirrored[1], value);
- }
- }
-
- this.hadoopConfig = hadoopConfig;
- }
- else {
- LOG.warn("The factory has not been configured prior to loading the S3 file system."
- + " Using Hadoop configuration from the classpath.");
-
- hadoopConfig = new org.apache.hadoop.conf.Configuration();
- this.hadoopConfig = hadoopConfig;
- }
- }
-
- // -- (2) Instantiate the Presto file system class for that scheme
-
- final String scheme = fsUri.getScheme();
- final String authority = fsUri.getAuthority();
- final URI initUri;
-
- if (scheme == null && authority == null) {
- initUri = new URI("s3://s3.amazonaws.com");
- }
- else if (scheme != null && authority == null) {
- initUri = new URI(scheme + "://s3.amazonaws.com");
- }
- else {
- initUri = fsUri;
- }
-
- final PrestoS3FileSystem fs = new PrestoS3FileSystem();
- fs.initialize(initUri, hadoopConfig);
-
- return new HadoopFileSystem(fs);
+ if (scheme == null && authority == null) {
+ initUri = createURI("s3://s3.amazonaws.com");
+ }
+ else if (scheme != null && authority == null) {
+ initUri = createURI(scheme + "://s3.amazonaws.com");
}
- catch (IOException e) {
- throw e;
+ else {
+ initUri = fsUri;
}
- catch (Exception e) {
- throw new IOException(e.getMessage(), e);
+ return initUri;
+ }
+
+ private URI createURI(String str) {
+ try {
+ return new URI(str);
+ } catch (URISyntaxException e) {
+ throw new FlinkRuntimeException("Error in s3 aws URI - " + str, e);
}
}
}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 7e2d12a..4eeb2d4 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import com.amazonaws.auth.AWSCredentialsProvider;
@@ -31,6 +32,7 @@ import org.junit.Test;
import java.lang.reflect.Field;
import java.net.URI;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -75,6 +77,19 @@ public class PrestoS3FileSystemTest {
validateBasicCredentials(fs);
}
+ @Test
+ public void testShadingOfAwsCredProviderConfig() {
+ final Configuration conf = new Configuration();
+ conf.setString("presto.s3.credentials-provider", "com.amazonaws.auth.ContainerCredentialsProvider");
+
+ HadoopConfigLoader configLoader = S3FileSystemFactory.createHadoopConfigLoader();
+ configLoader.setFlinkConfig(conf);
+
+ org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();
+ assertEquals("org.apache.flink.fs.s3presto.shaded.com.amazonaws.auth.ContainerCredentialsProvider",
+ hadoopConfig.get("presto.s3.credentials-provider"));
+ }
+
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------