You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/07/27 11:51:26 UTC

[flink] branch master updated: [FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop config

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7be0787  [FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop config
7be0787 is described below

commit 7be07871c23b56547add4cd85e15b95c757f882b
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 23 18:10:55 2018 +0200

    [FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop config
---
 .../runtime/fs/hdfs/AbstractFileSystemFactory.java |  65 ++++++++++
 .../flink/runtime/fs/hdfs/HadoopConfigLoader.java  | 132 ++++++++++++++++++++
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     | 130 ++++++-------------
 .../flink/fs/s3hadoop/HadoopS3FileSystemTest.java  |  44 +++++++
 .../flink/fs/s3presto/S3FileSystemFactory.java     | 137 +++++++--------------
 .../flink/fs/s3presto/PrestoS3FileSystemTest.java  |  15 +++
 6 files changed, 342 insertions(+), 181 deletions(-)

diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
new file mode 100644
index 0000000..201d580
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** Base class for Hadoop file system factories. */
+public abstract class AbstractFileSystemFactory implements FileSystemFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSystemFactory.class);
+
+	/** Name of this factory for logging. */
+	private final String name;
+
+	private final HadoopConfigLoader hadoopConfigLoader;
+
+	protected AbstractFileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) {
+		this.name = name;
+		this.hadoopConfigLoader = hadoopConfigLoader;
+	}
+
+	@Override
+	public void configure(Configuration config) {
+		hadoopConfigLoader.setFlinkConfig(config);
+	}
+
+	@Override
+	public FileSystem create(URI fsUri) throws IOException {
+		LOG.debug("Creating Hadoop file system (backed by " + name + ")");
+		LOG.debug("Loading Hadoop configuration for " + name);
+		org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
+		org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
+		fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
+		return new HadoopFileSystem(fs);
+	}
+
+	protected abstract org.apache.hadoop.fs.FileSystem createHadoopFileSystem();
+
+	protected abstract URI getInitURI(
+		URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
+}
+
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
new file mode 100644
index 0000000..a40e24f
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.Set;
+
+/** This class lazily loads hadoop configuration from resettable Flink's configuration. */
+public class HadoopConfigLoader {
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopConfigLoader.class);
+
+	/** The prefixes that Flink adds to the Hadoop fs config. */
+	private final String[] flinkConfigPrefixes;
+
+	/** Keys that are replaced (after prefix replacement, to give a more uniform experience
+	 * across different file system implementations. */
+	private final String[][] mirroredConfigKeys;
+
+	/** Hadoop config prefix to replace Flink prefix. */
+	private final String hadoopConfigPrefix;
+
+	private final Set<String> packagePrefixesToShade;
+	private final Set<String> configKeysToShade;
+	private final String flinkShadingPrefix;
+
+	/** Flink's configuration object. */
+	private Configuration flinkConfig;
+
+	/** Hadoop's configuration for the file systems, lazily initialized. */
+	private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+	public HadoopConfigLoader(
+		@Nonnull String[] flinkConfigPrefixes,
+		@Nonnull String[][] mirroredConfigKeys,
+		@Nonnull String hadoopConfigPrefix,
+		Set<String> packagePrefixesToShade,
+		@Nonnull Set<String> configKeysToShade,
+		@Nonnull String flinkShadingPrefix) {
+		this.flinkConfigPrefixes = flinkConfigPrefixes;
+		this.mirroredConfigKeys = mirroredConfigKeys;
+		this.hadoopConfigPrefix = hadoopConfigPrefix;
+		this.packagePrefixesToShade = packagePrefixesToShade;
+		this.configKeysToShade = configKeysToShade;
+		this.flinkShadingPrefix = flinkShadingPrefix;
+	}
+
+	public void setFlinkConfig(Configuration config) {
+		flinkConfig = config;
+		hadoopConfig = null;
+	}
+
+	/** get the loaded Hadoop config (or fall back to one loaded from the classpath). */
+	public org.apache.hadoop.conf.Configuration getOrLoadHadoopConfig() {
+		org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
+		if (hadoopConfig == null) {
+			if (flinkConfig != null) {
+				hadoopConfig = mirrorCertainHadoopConfig(loadHadoopConfigFromFlink());
+			}
+			else {
+				LOG.warn("Flink configuration is not set prior to loading this configuration."
+					+ " Using Hadoop configuration from the classpath.");
+				hadoopConfig = new org.apache.hadoop.conf.Configuration();
+			}
+		}
+		this.hadoopConfig = hadoopConfig;
+		return hadoopConfig;
+	}
+
+	// add additional config entries from the Flink config to the Hadoop config
+	private org.apache.hadoop.conf.Configuration loadHadoopConfigFromFlink() {
+		org.apache.hadoop.conf.Configuration hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+		for (String key : flinkConfig.keySet()) {
+			for (String prefix : flinkConfigPrefixes) {
+				if (key.startsWith(prefix)) {
+					String value = flinkConfig.getString(key, null);
+					String newKey = hadoopConfigPrefix + key.substring(prefix.length());
+					String newValue = fixHadoopConfig(key, flinkConfig.getString(key, null));
+					hadoopConfig.set(newKey, newValue);
+
+					LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config", key, newKey, value);
+				}
+			}
+		}
+		return hadoopConfig;
+	}
+
+	// mirror certain keys to make use more uniform across implementations
+	// with different keys
+	private org.apache.hadoop.conf.Configuration mirrorCertainHadoopConfig(
+		org.apache.hadoop.conf.Configuration hadoopConfig) {
+		for (String[] mirrored : mirroredConfigKeys) {
+			String value = hadoopConfig.get(mirrored[0], null);
+			if (value != null) {
+				hadoopConfig.set(mirrored[1], value);
+			}
+		}
+		return hadoopConfig;
+	}
+
+	private String fixHadoopConfig(String key, String value) {
+		return key != null && configKeysToShade.contains(key) ?
+			shadeClassConfig(value) : value;
+	}
+
+	private String shadeClassConfig(String classConfig) {
+		return packagePrefixesToShade.stream().anyMatch(classConfig::startsWith) ?
+			flinkShadingPrefix + classConfig : classConfig;
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index bd272e5..4ba49bb 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -18,127 +18,77 @@
 
 package org.apache.flink.fs.s3hadoop;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
 
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory implements FileSystemFactory {
-
+public class S3FileSystemFactory extends AbstractFileSystemFactory {
 	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
 
-	/** The prefixes that Flink adds to the Hadoop config under 'fs.s3a.'. */
-	private static final String[] CONFIG_PREFIXES = { "s3.", "s3a.", "fs.s3a." };
+	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
+		new HashSet<>(Collections.singletonList("com.amazonaws."));
+
+	private static final Set<String> CONFIG_KEYS_TO_SHADE =
+		Collections.unmodifiableSet(new HashSet<>(Collections.singleton("fs.s3a.aws.credentials.provider")));
+
+	private static final String FLINK_SHADING_PREFIX = "org.apache.flink.fs.s3hadoop.shaded.";
+
+	private static final String[] FLINK_CONFIG_PREFIXES = { "s3.", "s3a.", "fs.s3a." };
 
-	/** Keys that are replaced (after prefix replacement, to give a more uniform experience
-	 * across different file system implementations. */
 	private static final String[][] MIRRORED_CONFIG_KEYS = {
 			{ "fs.s3a.access-key", "fs.s3a.access.key" },
 			{ "fs.s3a.secret-key", "fs.s3a.secret.key" }
 	};
 
-	/** Flink's configuration object. */
-	private Configuration flinkConfig;
-
-	/** Hadoop's configuration for the file systems, lazily initialized. */
-	private org.apache.hadoop.conf.Configuration hadoopConfig;
+	public S3FileSystemFactory() {
+		super("Hadoop s3a file system", createHadoopConfigLoader());
+	}
 
 	@Override
 	public String getScheme() {
 		return "s3";
 	}
 
-	@Override
-	public void configure(Configuration config) {
-		flinkConfig = config;
-		hadoopConfig = null;
+	@VisibleForTesting
+	static HadoopConfigLoader createHadoopConfigLoader() {
+		return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
+			"fs.s3a.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
 	}
 
 	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		LOG.debug("Creating S3 file system (backed by a Hadoop s3a file system)");
-
-		try {
-			// -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
-
-			org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
-			if (hadoopConfig == null) {
-				if (flinkConfig != null) {
-					LOG.debug("Loading Hadoop configuration for s3a file system");
-					hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
-
-					// add additional config entries from the Flink config to the Presto Hadoop config
-					for (String key : flinkConfig.keySet()) {
-						for (String prefix : CONFIG_PREFIXES) {
-							if (key.startsWith(prefix)) {
-								String value = flinkConfig.getString(key, null);
-								String newKey = "fs.s3a." + key.substring(prefix.length());
-								hadoopConfig.set(newKey, flinkConfig.getString(key, null));
-
-								LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " +
-										"S3A File System", key, newKey, value);
-							}
-						}
-					}
-
-					// mirror certain keys to make use more uniform across s3 implementations
-					// with different keys
-					for (String[] mirrored : MIRRORED_CONFIG_KEYS) {
-						String value = hadoopConfig.get(mirrored[0], null);
-						if (value != null) {
-							hadoopConfig.set(mirrored[1], value);
-						}
-					}
-
-					this.hadoopConfig = hadoopConfig;
-				}
-				else {
-					LOG.warn("The factory has not been configured prior to loading the S3 file system."
-							+ " Using Hadoop configuration from the classpath.");
-
-					hadoopConfig = new org.apache.hadoop.conf.Configuration();
-					this.hadoopConfig = hadoopConfig;
-				}
-			}
-
-			// -- (2) Instantiate the Hadoop file system class for that scheme
+	protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+		return new S3AFileSystem();
+	}
 
-			final String scheme = fsUri.getScheme();
-			final String authority = fsUri.getAuthority();
+	@Override
+	protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+		final String scheme = fsUri.getScheme();
+		final String authority = fsUri.getAuthority();
 
-			if (scheme == null && authority == null) {
-				fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
-			}
-			else if (scheme != null && authority == null) {
-				URI defaultUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
-				if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
-					fsUri = defaultUri;
-				}
+		if (scheme == null && authority == null) {
+			fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+		}
+		else if (scheme != null && authority == null) {
+			URI defaultUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+			if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
+				fsUri = defaultUri;
 			}
+		}
 
-			LOG.debug("Using scheme {} for s3a file system backing the S3 File System", fsUri);
-
-			final S3AFileSystem fs = new S3AFileSystem();
-			fs.initialize(fsUri, hadoopConfig);
+		LOG.debug("Using scheme {} for s3a file system backing the S3 File System", fsUri);
 
-			return new HadoopFileSystem(fs);
-		}
-		catch (IOException e) {
-			throw e;
-		}
-		catch (Exception e) {
-			throw new IOException(e.getMessage(), e);
-		}
+		return fsUri;
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
new file mode 100644
index 0000000..647a937
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for the S3 file system support via Hadoop's {@link org.apache.hadoop.fs.s3a.S3AFileSystem}.
+ */
+public class HadoopS3FileSystemTest {
+	@Test
+	public void testShadingOfAwsCredProviderConfig() {
+		final Configuration conf = new Configuration();
+		conf.setString("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.ContainerCredentialsProvider");
+
+		HadoopConfigLoader configLoader = S3FileSystemFactory.createHadoopConfigLoader();
+		configLoader.setFlinkConfig(conf);
+
+		org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();
+		assertEquals("org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider",
+			hadoopConfig.get("fs.s3a.aws.credentials.provider"));
+	}
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index 8847dc9c..a04f9c9 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -18,126 +18,81 @@
 
 package org.apache.flink.fs.s3presto;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory implements FileSystemFactory {
+public class S3FileSystemFactory extends AbstractFileSystemFactory {
+	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
+		new HashSet<>(Collections.singletonList("com.amazonaws."));
 
-	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
+	private static final Set<String> CONFIG_KEYS_TO_SHADE =
+		Collections.unmodifiableSet(new HashSet<>(Collections.singleton("presto.s3.credentials-provider")));
 
-	/** The prefixes that Flink adds to the Hadoop config under 'presto.s3.'. */
-	private static final String[] CONFIG_PREFIXES = { "s3.", "presto.s3." };
+	private static final String FLINK_SHADING_PREFIX = "org.apache.flink.fs.s3presto.shaded.";
+
+	private static final String[] FLINK_CONFIG_PREFIXES = { "s3.", "presto.s3." };
 
-	/** Keys that are replaced (after prefix replacement, to give a more uniform experience
-	 * across different file system implementations. */
 	private static final String[][] MIRRORED_CONFIG_KEYS = {
 			{ "presto.s3.access.key", "presto.s3.access-key" },
 			{ "presto.s3.secret.key", "presto.s3.secret-key" }
 	};
 
-	/** Flink's configuration object. */
-	private Configuration flinkConfig;
-
-	/** Hadoop's configuration for the file systems, lazily initialized. */
-	private org.apache.hadoop.conf.Configuration hadoopConfig;
+	public S3FileSystemFactory() {
+		super("Presto S3 File System", createHadoopConfigLoader());
+	}
 
 	@Override
 	public String getScheme() {
 		return "s3";
 	}
 
+	@VisibleForTesting
+	static HadoopConfigLoader createHadoopConfigLoader() {
+		return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
+			"presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
+	}
+
 	@Override
-	public void configure(Configuration config) {
-		flinkConfig = config;
-		hadoopConfig = null;
+	protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+		return new PrestoS3FileSystem();
 	}
 
 	@Override
-	public FileSystem create(URI fsUri) throws IOException {
-		LOG.debug("Creating S3 file system (backed by a Hadoop s3a file system");
+	protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+		final String scheme = fsUri.getScheme();
+		final String authority = fsUri.getAuthority();
+		final URI initUri;
 
-		try {
-			// -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
-
-			org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
-			if (hadoopConfig == null) {
-				if (flinkConfig != null) {
-					LOG.debug("Loading Hadoop configuration for Presto S3 file system");
-					hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
-
-					// add additional config entries from the Flink config to the Presto Hadoop config
-					for (String key : flinkConfig.keySet()) {
-						for (String prefix : CONFIG_PREFIXES) {
-							if (key.startsWith(prefix)) {
-								String value = flinkConfig.getString(key, null);
-								String newKey = "presto.s3." + key.substring(prefix.length());
-								hadoopConfig.set(newKey, flinkConfig.getString(key, null));
-
-								LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " +
-										"Presto S3 File System", key, newKey, value);
-							}
-						}
-					}
-
-					// mirror certain keys to make use more uniform across s3 implementations
-					// with different keys
-					for (String[] mirrored : MIRRORED_CONFIG_KEYS) {
-						String value = hadoopConfig.get(mirrored[0], null);
-						if (value != null) {
-							hadoopConfig.set(mirrored[1], value);
-						}
-					}
-
-					this.hadoopConfig = hadoopConfig;
-				}
-				else {
-					LOG.warn("The factory has not been configured prior to loading the S3 file system."
-							+ " Using Hadoop configuration from the classpath.");
-
-					hadoopConfig = new org.apache.hadoop.conf.Configuration();
-					this.hadoopConfig = hadoopConfig;
-				}
-			}
-
-			// -- (2) Instantiate the Presto file system class for that scheme
-
-			final String scheme = fsUri.getScheme();
-			final String authority = fsUri.getAuthority();
-			final URI initUri;
-
-			if (scheme == null && authority == null) {
-				initUri = new URI("s3://s3.amazonaws.com");
-			}
-			else if (scheme != null && authority == null) {
-				initUri = new URI(scheme + "://s3.amazonaws.com");
-			}
-			else {
-				initUri = fsUri;
-			}
-
-			final PrestoS3FileSystem fs = new PrestoS3FileSystem();
-			fs.initialize(initUri, hadoopConfig);
-
-			return new HadoopFileSystem(fs);
+		if (scheme == null && authority == null) {
+			initUri = createURI("s3://s3.amazonaws.com");
+		}
+		else if (scheme != null && authority == null) {
+			initUri = createURI(scheme + "://s3.amazonaws.com");
 		}
-		catch (IOException e) {
-			throw e;
+		else {
+			initUri = fsUri;
 		}
-		catch (Exception e) {
-			throw new IOException(e.getMessage(), e);
+		return initUri;
+	}
+
+	private URI createURI(String str) {
+		try {
+			return new URI(str);
+		} catch (URISyntaxException e) {
+			throw new FlinkRuntimeException("Error in s3 aws URI - " + str, e);
 		}
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 7e2d12a..4eeb2d4 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
@@ -31,6 +32,7 @@ import org.junit.Test;
 import java.lang.reflect.Field;
 import java.net.URI;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -75,6 +77,19 @@ public class PrestoS3FileSystemTest {
 		validateBasicCredentials(fs);
 	}
 
+	@Test
+	public void testShadingOfAwsCredProviderConfig() {
+		final Configuration conf = new Configuration();
+		conf.setString("presto.s3.credentials-provider", "com.amazonaws.auth.ContainerCredentialsProvider");
+
+		HadoopConfigLoader configLoader = S3FileSystemFactory.createHadoopConfigLoader();
+		configLoader.setFlinkConfig(conf);
+
+		org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();
+		assertEquals("org.apache.flink.fs.s3presto.shaded.com.amazonaws.auth.ContainerCredentialsProvider",
+			hadoopConfig.get("presto.s3.credentials-provider"));
+	}
+
 	// ------------------------------------------------------------------------
 	//  utilities
 	// ------------------------------------------------------------------------