You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/10 23:32:04 UTC
[04/25] hadoop git commit: HADOOP-15583. Stabilize S3A Assumed Role
support. Contributed by Steve Loughran.
HADOOP-15583. Stabilize S3A Assumed Role support.
Contributed by Steve Loughran.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/da9a39ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/da9a39ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/da9a39ee
Branch: refs/heads/HDFS-12943
Commit: da9a39eed138210de29b59b90c449b28da1c04f9
Parents: d81cd36
Author: Steve Loughran <st...@apache.org>
Authored: Wed Aug 8 22:57:10 2018 -0700
Committer: Steve Loughran <st...@apache.org>
Committed: Wed Aug 8 22:57:24 2018 -0700
----------------------------------------------------------------------
.../src/main/resources/core-default.xml | 18 +-
.../fs/s3a/AWSCredentialProviderList.java | 101 ++++++--
.../org/apache/hadoop/fs/s3a/Constants.java | 19 +-
.../hadoop/fs/s3a/DefaultS3ClientFactory.java | 190 ++++----------
.../fs/s3a/InconsistentAmazonS3Client.java | 10 +
.../fs/s3a/InconsistentS3ClientFactory.java | 11 +
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 35 ++-
.../apache/hadoop/fs/s3a/S3ARetryPolicy.java | 4 +-
.../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 245 +++++++++++++++++--
.../apache/hadoop/fs/s3a/S3ClientFactory.java | 7 +-
.../s3a/auth/AssumedRoleCredentialProvider.java | 78 +++++-
.../fs/s3a/auth/NoAuthWithAWSException.java | 37 +++
.../apache/hadoop/fs/s3a/auth/RoleModel.java | 8 +
.../apache/hadoop/fs/s3a/auth/RolePolicies.java | 143 +++++++++--
.../hadoop/fs/s3a/auth/STSClientFactory.java | 78 ++++++
.../fs/s3a/s3guard/DynamoDBClientFactory.java | 18 +-
.../fs/s3a/s3guard/DynamoDBMetadataStore.java | 62 ++++-
.../markdown/tools/hadoop-aws/assumed_roles.md | 191 +++++++++++----
.../src/site/markdown/tools/hadoop-aws/index.md | 6 +-
.../hadoop/fs/s3a/ITestS3AConfiguration.java | 117 ++++-----
.../fs/s3a/ITestS3ATemporaryCredentials.java | 71 +++---
.../fs/s3a/ITestS3GuardListConsistency.java | 68 +++--
.../hadoop/fs/s3a/ITestS3GuardWriteBack.java | 57 +++--
.../hadoop/fs/s3a/MockS3ClientFactory.java | 6 +-
.../fs/s3a/TestS3AAWSCredentialsProvider.java | 76 +++++-
.../hadoop/fs/s3a/auth/ITestAssumeRole.java | 151 ++++++++++--
.../auth/ITestAssumedRoleCommitOperations.java | 5 +-
.../hadoop/fs/s3a/auth/RoleTestUtils.java | 24 +-
.../s3guard/AbstractS3GuardToolTestBase.java | 7 +-
.../s3a/s3guard/ITestS3GuardConcurrentOps.java | 147 ++++++-----
30 files changed, 1461 insertions(+), 529 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 75acf48..29c2bc2 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1033,7 +1033,19 @@
<name>fs.s3a.assumed.role.sts.endpoint</name>
<value/>
<description>
- AWS Simple Token Service Endpoint. If unset, uses the default endpoint.
+ AWS Security Token Service Endpoint.
+ If unset, uses the default endpoint.
+ Only used if AssumedRoleCredentialProvider is the AWS credential provider.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.assumed.role.sts.endpoint.region</name>
+ <value>us-west-1</value>
+ <description>
+ AWS Security Token Service Endpoint's region;
+ Needed if fs.s3a.assumed.role.sts.endpoint points to an endpoint
+ other than the default one and the v4 signature is used.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
@@ -1058,7 +1070,9 @@
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>true</value>
- <description>Enables or disables SSL connections to S3.</description>
+ <description>Enables or disables SSL connections to AWS services.
+ Also sets the default port to use for the s3a proxy settings,
+ when not explicitly set in fs.s3a.proxy.port.</description>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java
index 10201f0..f9052fa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java
@@ -18,25 +18,29 @@
package org.apache.hadoop.fs.s3a;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
/**
* A list of providers.
*
@@ -62,10 +66,18 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
public static final String NO_AWS_CREDENTIAL_PROVIDERS
= "No AWS Credential Providers";
+ static final String
+ CREDENTIALS_REQUESTED_WHEN_CLOSED
+ = "Credentials requested after provider list was closed";
+
private final List<AWSCredentialsProvider> providers = new ArrayList<>(1);
private boolean reuseLastProvider = true;
private AWSCredentialsProvider lastProvider;
+ private final AtomicInteger refCount = new AtomicInteger(1);
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
/**
* Empty instance. This is not ready to be used.
*/
@@ -94,6 +106,9 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
*/
@Override
public void refresh() {
+ if (isClosed()) {
+ return;
+ }
for (AWSCredentialsProvider provider : providers) {
provider.refresh();
}
@@ -106,6 +121,11 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
*/
@Override
public AWSCredentials getCredentials() {
+ if (isClosed()) {
+ LOG.warn(CREDENTIALS_REQUESTED_WHEN_CLOSED);
+ throw new NoAuthWithAWSException(
+ CREDENTIALS_REQUESTED_WHEN_CLOSED);
+ }
checkNotEmpty();
if (reuseLastProvider && lastProvider != null) {
return lastProvider.getCredentials();
@@ -136,8 +156,7 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
if (lastException != null) {
message += ": " + lastException;
}
- throw new AmazonClientException(message, lastException);
-
+ throw new NoAuthWithAWSException(message, lastException);
}
/**
@@ -156,7 +175,7 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
*/
public void checkNotEmpty() {
if (providers.isEmpty()) {
- throw new AmazonClientException(NO_AWS_CREDENTIAL_PROVIDERS);
+ throw new NoAuthWithAWSException(NO_AWS_CREDENTIAL_PROVIDERS);
}
}
@@ -178,8 +197,38 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
*/
@Override
public String toString() {
- return "AWSCredentialProviderList: " +
- StringUtils.join(providers, " ");
+ return "AWSCredentialProviderList[" +
+ "refcount= " + refCount.get() + ": [" +
+ StringUtils.join(providers, ", ") + ']';
+ }
+
+ /**
+ * Get a reference to this object with an updated reference count.
+ *
+ * @return a reference to this
+ */
+ public synchronized AWSCredentialProviderList share() {
+ Preconditions.checkState(!closed.get(), "Provider list is closed");
+ refCount.incrementAndGet();
+ return this;
+ }
+
+ /**
+ * Get the current reference count.
+ * @return the current ref count
+ */
+ @VisibleForTesting
+ public int getRefCount() {
+ return refCount.get();
+ }
+
+ /**
+ * Get the closed flag.
+ * @return true iff the list is closed.
+ */
+ @VisibleForTesting
+ public boolean isClosed() {
+ return closed.get();
}
/**
@@ -190,9 +239,29 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
*/
@Override
public void close() {
- for(AWSCredentialsProvider p: providers) {
+ synchronized (this) {
+ if (closed.get()) {
+ // already closed: no-op
+ return;
+ }
+ int remainder = refCount.decrementAndGet();
+ if (remainder != 0) {
+ // still actively used, or somehow things are
+ // now negative
+ LOG.debug("Not closing {}", this);
+ return;
+ }
+ // at this point, the closing is going to happen
+ LOG.debug("Closing {}", this);
+ closed.set(true);
+ }
+
+ // do this outside the synchronized block.
+ for (AWSCredentialsProvider p : providers) {
if (p instanceof Closeable) {
- IOUtils.closeStream((Closeable)p);
+ IOUtils.closeStream((Closeable) p);
+ } else if (p instanceof AutoCloseable) {
+ S3AUtils.closeAutocloseables(LOG, (AutoCloseable)p);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index c521936..a8da6ec 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -84,10 +84,27 @@ public final class Constants {
public static final String ASSUMED_ROLE_SESSION_DURATION =
"fs.s3a.assumed.role.session.duration";
- /** Simple Token Service Endpoint. If unset, uses the default endpoint. */
+ /** Security Token Service Endpoint. If unset, uses the default endpoint. */
public static final String ASSUMED_ROLE_STS_ENDPOINT =
"fs.s3a.assumed.role.sts.endpoint";
+ /**
+ * Region for the STS endpoint; only relevant if the endpoint
+ * is set.
+ */
+ public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION =
+ "fs.s3a.assumed.role.sts.endpoint.region";
+
+ /**
+ * Default value for the STS endpoint region; needed for
+ * v4 signing.
+ */
+ public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT =
+ "us-west-1";
+
+ /**
+ * Default duration of an assumed role.
+ */
public static final String ASSUMED_ROLE_SESSION_DURATION_DEFAULT = "30m";
/** list of providers to authenticate for the assumed role. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index f33b25e..ade317f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -18,59 +18,45 @@
package org.apache.hadoop.fs.s3a;
+import java.io.IOException;
+import java.net.URI;
+
import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
-import java.io.IOException;
-import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
-import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
-import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
/**
- * The default factory implementation, which calls the AWS SDK to configure
- * and create an {@link AmazonS3Client} that communicates with the S3 service.
+ * The default {@link S3ClientFactory} implementation.
+ * This which calls the AWS SDK to configure and create an
+ * {@link AmazonS3Client} that communicates with the S3 service.
*/
-public class DefaultS3ClientFactory extends Configured implements
- S3ClientFactory {
+public class DefaultS3ClientFactory extends Configured
+ implements S3ClientFactory {
protected static final Logger LOG = S3AFileSystem.LOG;
@Override
- public AmazonS3 createS3Client(URI name) throws IOException {
+ public AmazonS3 createS3Client(URI name,
+ final String bucket,
+ final AWSCredentialsProvider credentials) throws IOException {
Configuration conf = getConf();
- AWSCredentialsProvider credentials =
- createAWSCredentialProviderSet(name, conf);
- final ClientConfiguration awsConf = createAwsConf(getConf());
- AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
- return createAmazonS3Client(s3, conf, credentials, awsConf);
+ final ClientConfiguration awsConf = S3AUtils.createAwsConf(getConf(), bucket);
+ return configureAmazonS3Client(
+ newAmazonS3Client(credentials, awsConf), conf);
}
/**
- * Create a new {@link ClientConfiguration}.
- * @param conf The Hadoop configuration
- * @return new AWS client configuration
- */
- public static ClientConfiguration createAwsConf(Configuration conf) {
- final ClientConfiguration awsConf = new ClientConfiguration();
- initConnectionSettings(conf, awsConf);
- initProxySupport(conf, awsConf);
- initUserAgent(conf, awsConf);
- return awsConf;
- }
-
- /**
- * Wrapper around constructor for {@link AmazonS3} client. Override this to
- * provide an extended version of the client
+ * Wrapper around constructor for {@link AmazonS3} client.
+ * Override this to provide an extended version of the client
* @param credentials credentials to use
* @param awsConf AWS configuration
* @return new AmazonS3 client
@@ -81,120 +67,17 @@ public class DefaultS3ClientFactory extends Configured implements
}
/**
- * Initializes all AWS SDK settings related to connection management.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- */
- private static void initConnectionSettings(Configuration conf,
- ClientConfiguration awsConf) {
- awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
- DEFAULT_MAXIMUM_CONNECTIONS, 1));
- boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
- DEFAULT_SECURE_CONNECTIONS);
- awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
- awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
- DEFAULT_MAX_ERROR_RETRIES, 0));
- awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
- DEFAULT_ESTABLISH_TIMEOUT, 0));
- awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
- DEFAULT_SOCKET_TIMEOUT, 0));
- int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
- DEFAULT_SOCKET_SEND_BUFFER, 2048);
- int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
- DEFAULT_SOCKET_RECV_BUFFER, 2048);
- awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
- String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
- if (!signerOverride.isEmpty()) {
- LOG.debug("Signer override = {}", signerOverride);
- awsConf.setSignerOverride(signerOverride);
- }
- }
-
- /**
- * Initializes AWS SDK proxy support if configured.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- * @throws IllegalArgumentException if misconfigured
- */
- private static void initProxySupport(Configuration conf,
- ClientConfiguration awsConf) throws IllegalArgumentException {
- String proxyHost = conf.getTrimmed(PROXY_HOST, "");
- int proxyPort = conf.getInt(PROXY_PORT, -1);
- if (!proxyHost.isEmpty()) {
- awsConf.setProxyHost(proxyHost);
- if (proxyPort >= 0) {
- awsConf.setProxyPort(proxyPort);
- } else {
- if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
- LOG.warn("Proxy host set without port. Using HTTPS default 443");
- awsConf.setProxyPort(443);
- } else {
- LOG.warn("Proxy host set without port. Using HTTP default 80");
- awsConf.setProxyPort(80);
- }
- }
- String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
- String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
- if ((proxyUsername == null) != (proxyPassword == null)) {
- String msg = "Proxy error: " + PROXY_USERNAME + " or " +
- PROXY_PASSWORD + " set without the other.";
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- awsConf.setProxyUsername(proxyUsername);
- awsConf.setProxyPassword(proxyPassword);
- awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
- awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
- "domain {} as workstation {}", awsConf.getProxyHost(),
- awsConf.getProxyPort(),
- String.valueOf(awsConf.getProxyUsername()),
- awsConf.getProxyPassword(), awsConf.getProxyDomain(),
- awsConf.getProxyWorkstation());
- }
- } else if (proxyPort >= 0) {
- String msg =
- "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- }
-
- /**
- * Initializes the User-Agent header to send in HTTP requests to the S3
- * back-end. We always include the Hadoop version number. The user also
- * may set an optional custom prefix to put in front of the Hadoop version
- * number. The AWS SDK interally appends its own information, which seems
- * to include the AWS SDK version, OS and JVM version.
+ * Configure S3 client from the Hadoop configuration.
*
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- */
- private static void initUserAgent(Configuration conf,
- ClientConfiguration awsConf) {
- String userAgent = "Hadoop " + VersionInfo.getVersion();
- String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
- if (!userAgentPrefix.isEmpty()) {
- userAgent = userAgentPrefix + ", " + userAgent;
- }
- LOG.debug("Using User-Agent: {}", userAgent);
- awsConf.setUserAgentPrefix(userAgent);
- }
-
- /**
- * Creates an {@link AmazonS3Client} from the established configuration.
+ * This includes: endpoint, Path Access and possibly other
+ * options.
*
* @param conf Hadoop configuration
- * @param credentials AWS credentials
- * @param awsConf AWS SDK configuration
* @return S3 client
* @throws IllegalArgumentException if misconfigured
*/
- private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf,
- AWSCredentialsProvider credentials, ClientConfiguration awsConf)
+ private static AmazonS3 configureAmazonS3Client(AmazonS3 s3,
+ Configuration conf)
throws IllegalArgumentException {
String endPoint = conf.getTrimmed(ENDPOINT, "");
if (!endPoint.isEmpty()) {
@@ -206,21 +89,29 @@ public class DefaultS3ClientFactory extends Configured implements
throw new IllegalArgumentException(msg, e);
}
}
- enablePathStyleAccessIfRequired(s3, conf);
- return s3;
+ return applyS3ClientOptions(s3, conf);
}
/**
- * Enables path-style access to S3 buckets if configured. By default, the
+ * Perform any tuning of the {@code S3ClientOptions} settings based on
+ * the Hadoop configuration.
+ * This is different from the general AWS configuration creation as
+ * it is unique to S3 connections.
+ *
+ * The {@link Constants#PATH_STYLE_ACCESS} option enables path-style access
+ * to S3 buckets if configured. By default, the
* behavior is to use virtual hosted-style access with URIs of the form
- * http://bucketname.s3.amazonaws.com. Enabling path-style access and a
+ * {@code http://bucketname.s3.amazonaws.com}
+ * Enabling path-style access and a
* region-specific endpoint switches the behavior to use URIs of the form
- * http://s3-eu-west-1.amazonaws.com/bucketname.
- *
+ * {@code http://s3-eu-west-1.amazonaws.com/bucketname}.
+ * It is common to use this when connecting to private S3 servers, as it
+ * avoids the need to play with DNS entries.
* @param s3 S3 client
* @param conf Hadoop configuration
+ * @return the S3 client
*/
- private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
+ private static AmazonS3 applyS3ClientOptions(AmazonS3 s3,
Configuration conf) {
final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
if (pathStyleAccess) {
@@ -229,5 +120,6 @@ public class DefaultS3ClientFactory extends Configured implements
.setPathStyleAccess(true)
.build());
}
+ return s3;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index 99ed87d..2cd1aae 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -114,6 +114,16 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
/** Map of key to delay -> time it was created. */
private Map<String, Long> delayedPutKeys = new HashMap<>();
+ /**
+ * Instantiate.
+ * This subclasses a deprecated constructor of the parent
+ * {@code AmazonS3Client} class; we can't use the builder API because,
+ * that only creates the consistent client.
+ * @param credentials credentials to auth.
+ * @param clientConfiguration connection settings
+ * @param conf hadoop configuration.
+ */
+ @SuppressWarnings("deprecation")
public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
ClientConfiguration clientConfiguration, Configuration conf) {
super(credentials, clientConfiguration);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
index 17d268b..932c472 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
@@ -21,16 +21,27 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* S3 Client factory used for testing with eventual consistency fault injection.
+ * This client is for testing <i>only</i>; it is in the production
+ * {@code hadoop-aws} module to enable integration tests to use this
+ * just by editing the Hadoop configuration used to bring up the client.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {
+ /**
+ * Create the inconsistent client.
+ * Logs a warning that this is being done.
+ * @param credentials credentials to use
+ * @param awsConf AWS configuration
+ * @return an inconsistent client.
+ */
@Override
protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials,
ClientConfiguration awsConf) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 737d7da..72a5fde 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -77,8 +77,9 @@ import com.amazonaws.event.ProgressListener;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -124,9 +125,6 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* The core S3A Filesystem implementation.
*
@@ -205,6 +203,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private boolean useListV1;
private MagicCommitIntegration committerIntegration;
+ private AWSCredentialProviderList credentials;
+
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@@ -252,8 +252,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
+
+ credentials = createAWSCredentialProviderSet(name, conf);
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
- .createS3Client(name);
+ .createS3Client(name, bucket, credentials);
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()),
onRetry);
@@ -2470,12 +2472,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
transfers.shutdownNow(true);
transfers = null;
}
- if (metadataStore != null) {
- metadataStore.close();
- metadataStore = null;
- }
- IOUtils.closeQuietly(instrumentation);
+ S3AUtils.closeAll(LOG, metadataStore, instrumentation);
+ metadataStore = null;
instrumentation = null;
+ closeAutocloseables(LOG, credentials);
+ credentials = null;
}
}
@@ -2885,6 +2886,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
}
sb.append(", boundedExecutor=").append(boundedThreadPool);
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
+ sb.append(", credentials=").append(credentials);
sb.append(", statistics {")
.append(statistics)
.append("}");
@@ -3319,4 +3321,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
return false;
}
}
+
+ /**
+ * Get a shared copy of the AWS credentials, with its reference
+ * counter updated.
+ * Caller is required to call {@code close()} on this after
+ * they have finished using it.
+ * @param purpose what is this for? This is initially for logging
+ * @return a reference to shared credentials.
+ */
+ public AWSCredentialProviderList shareCredentials(final String purpose) {
+ LOG.debug("Sharing credentials for: {}", purpose);
+ return credentials.share();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index 2b361fd..e6e7895 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -37,6 +37,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.ConnectTimeoutException;
@@ -154,8 +155,9 @@ public class S3ARetryPolicy implements RetryPolicy {
policyMap.put(InterruptedException.class, fail);
// note this does not pick up subclasses (like socket timeout)
policyMap.put(InterruptedIOException.class, fail);
- // interesting question: should this be retried ever?
+ // Access denial and auth exceptions are not retried
policyMap.put(AccessDeniedException.class, fail);
+ policyMap.put(NoAuthWithAWSException.class, fail);
policyMap.put(FileNotFoundException.class, fail);
policyMap.put(InvalidRequestException.class, fail);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index a5f7d75..9908fd1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.AbortedException;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
import com.amazonaws.SdkBaseException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
@@ -44,15 +46,18 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.util.VersionInfo;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.Closeable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -174,11 +179,17 @@ public final class S3AUtils {
// call considered an sign of connectivity failure
return (EOFException)new EOFException(message).initCause(exception);
}
+ if (exception instanceof NoAuthWithAWSException) {
+ // the exception raised by AWSCredentialProvider list if the
+ // credentials were not accepted.
+ return (AccessDeniedException)new AccessDeniedException(path, null,
+ exception.toString()).initCause(exception);
+ }
return new AWSClientIOException(message, exception);
} else {
if (exception instanceof AmazonDynamoDBException) {
// special handling for dynamo DB exceptions
- return translateDynamoDBException(message,
+ return translateDynamoDBException(path, message,
(AmazonDynamoDBException)exception);
}
IOException ioe;
@@ -373,20 +384,45 @@ public final class S3AUtils {
/**
* Translate a DynamoDB exception into an IOException.
+ *
+ * @param path path in the DDB
* @param message preformatted message for the exception
- * @param ex exception
+ * @param ddbException exception
* @return an exception to throw.
*/
- public static IOException translateDynamoDBException(String message,
- AmazonDynamoDBException ex) {
- if (isThrottleException(ex)) {
- return new AWSServiceThrottledException(message, ex);
+ public static IOException translateDynamoDBException(final String path,
+ final String message,
+ final AmazonDynamoDBException ddbException) {
+ if (isThrottleException(ddbException)) {
+ return new AWSServiceThrottledException(message, ddbException);
}
- if (ex instanceof ResourceNotFoundException) {
+ if (ddbException instanceof ResourceNotFoundException) {
return (FileNotFoundException) new FileNotFoundException(message)
- .initCause(ex);
+ .initCause(ddbException);
+ }
+ final int statusCode = ddbException.getStatusCode();
+ final String errorCode = ddbException.getErrorCode();
+ IOException result = null;
+ // 400 gets used a lot by DDB
+ if (statusCode == 400) {
+ switch (errorCode) {
+ case "AccessDeniedException":
+ result = (IOException) new AccessDeniedException(
+ path,
+ null,
+ ddbException.toString())
+ .initCause(ddbException);
+ break;
+
+ default:
+ result = new AWSBadRequestException(message, ddbException);
+ }
+
}
- return new AWSServiceIOException(message, ex);
+ if (result == null) {
+ result = new AWSServiceIOException(message, ddbException);
+ }
+ return result;
}
/**
@@ -738,6 +774,29 @@ public final class S3AUtils {
String baseKey,
String overrideVal)
throws IOException {
+ return lookupPassword(bucket, conf, baseKey, overrideVal, "");
+ }
+
+ /**
+ * Get a password from a configuration, including JCEKS files, handling both
+ * the absolute key and bucket override.
+ * @param bucket bucket or "" if none known
+ * @param conf configuration
+ * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
+ * @param overrideVal override value: if non empty this is used instead of
+ * querying the configuration.
+ * @param defVal value to return if there is no password
+ * @return a password or the value of defVal.
+ * @throws IOException on any IO problem
+ * @throws IllegalArgumentException bad arguments
+ */
+ public static String lookupPassword(
+ String bucket,
+ Configuration conf,
+ String baseKey,
+ String overrideVal,
+ String defVal)
+ throws IOException {
String initialVal;
Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
"%s does not start with $%s", baseKey, FS_S3A_PREFIX);
@@ -757,7 +816,7 @@ public final class S3AUtils {
// no bucket, make the initial value the override value
initialVal = overrideVal;
}
- return getPassword(conf, baseKey, initialVal);
+ return getPassword(conf, baseKey, initialVal, defVal);
}
/**
@@ -1059,6 +1118,134 @@ public final class S3AUtils {
}
}
+ /**
+ * Create a new AWS {@code ClientConfiguration}.
+ * All clients to AWS services <i>MUST</i> use this for consistent setup
+ * of connectivity, UA, proxy settings.
+ * @param conf The Hadoop configuration
+ * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+ * @return new AWS client configuration
+ */
+ public static ClientConfiguration createAwsConf(Configuration conf,
+ String bucket)
+ throws IOException {
+ final ClientConfiguration awsConf = new ClientConfiguration();
+ initConnectionSettings(conf, awsConf);
+ initProxySupport(conf, bucket, awsConf);
+ initUserAgent(conf, awsConf);
+ return awsConf;
+ }
+
+ /**
+ * Initializes all AWS SDK settings related to connection management.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ */
+ public static void initConnectionSettings(Configuration conf,
+ ClientConfiguration awsConf) {
+ awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+ DEFAULT_MAXIMUM_CONNECTIONS, 1));
+ boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+ DEFAULT_SECURE_CONNECTIONS);
+ awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+ awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+ DEFAULT_MAX_ERROR_RETRIES, 0));
+ awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+ DEFAULT_ESTABLISH_TIMEOUT, 0));
+ awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+ DEFAULT_SOCKET_TIMEOUT, 0));
+ int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+ DEFAULT_SOCKET_SEND_BUFFER, 2048);
+ int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+ DEFAULT_SOCKET_RECV_BUFFER, 2048);
+ awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
+ String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+ if (!signerOverride.isEmpty()) {
+ LOG.debug("Signer override = {}", signerOverride);
+ awsConf.setSignerOverride(signerOverride);
+ }
+ }
+
+ /**
+ * Initializes AWS SDK proxy support in the AWS client configuration
+ * if the S3A settings enable it.
+ *
+ * @param conf Hadoop configuration
+ * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+ * @param awsConf AWS SDK configuration to update
+ * @throws IllegalArgumentException if misconfigured
+ * @throws IOException problem getting username/secret from password source.
+ */
+ public static void initProxySupport(Configuration conf,
+ String bucket,
+ ClientConfiguration awsConf) throws IllegalArgumentException,
+ IOException {
+ String proxyHost = conf.getTrimmed(PROXY_HOST, "");
+ int proxyPort = conf.getInt(PROXY_PORT, -1);
+ if (!proxyHost.isEmpty()) {
+ awsConf.setProxyHost(proxyHost);
+ if (proxyPort >= 0) {
+ awsConf.setProxyPort(proxyPort);
+ } else {
+ if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
+ LOG.warn("Proxy host set without port. Using HTTPS default 443");
+ awsConf.setProxyPort(443);
+ } else {
+ LOG.warn("Proxy host set without port. Using HTTP default 80");
+ awsConf.setProxyPort(80);
+ }
+ }
+ final String proxyUsername = lookupPassword(bucket, conf, PROXY_USERNAME,
+ null, null);
+ final String proxyPassword = lookupPassword(bucket, conf, PROXY_PASSWORD,
+ null, null);
+ if ((proxyUsername == null) != (proxyPassword == null)) {
+ String msg = "Proxy error: " + PROXY_USERNAME + " or " +
+ PROXY_PASSWORD + " set without the other.";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ awsConf.setProxyUsername(proxyUsername);
+ awsConf.setProxyPassword(proxyPassword);
+ awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+ awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+ "domain {} as workstation {}", awsConf.getProxyHost(),
+ awsConf.getProxyPort(),
+ String.valueOf(awsConf.getProxyUsername()),
+ awsConf.getProxyPassword(), awsConf.getProxyDomain(),
+ awsConf.getProxyWorkstation());
+ }
+ } else if (proxyPort >= 0) {
+ String msg =
+ "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ /**
+ * Initializes the User-Agent header to send in HTTP requests to AWS
+ * services. We always include the Hadoop version number. The user also
+ * may set an optional custom prefix to put in front of the Hadoop version
+ * number. The AWS SDK internally appends its own information, which seems
+ * to include the AWS SDK version, OS and JVM version.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration to update
+ */
+ private static void initUserAgent(Configuration conf,
+ ClientConfiguration awsConf) {
+ String userAgent = "Hadoop " + VersionInfo.getVersion();
+ String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
+ if (!userAgentPrefix.isEmpty()) {
+ userAgent = userAgentPrefix + ", " + userAgent;
+ }
+ LOG.debug("Using User-Agent: {}", userAgent);
+ awsConf.setUserAgentPrefix(userAgent);
+ }
/**
* An interface for use in lambda-expressions working with
@@ -1289,18 +1476,40 @@ public final class S3AUtils {
* @param closeables the objects to close
*/
public static void closeAll(Logger log,
- java.io.Closeable... closeables) {
- for (java.io.Closeable c : closeables) {
+ Closeable... closeables) {
+ if (log == null) {
+ log = LOG;
+ }
+ for (Closeable c : closeables) {
if (c != null) {
try {
- if (log != null) {
- log.debug("Closing {}", c);
- }
+ log.debug("Closing {}", c);
c.close();
} catch (Exception e) {
- if (log != null && log.isDebugEnabled()) {
- log.debug("Exception in closing {}", c, e);
- }
+ log.debug("Exception in closing {}", c, e);
+ }
+ }
+ }
+ }
+ /**
+ * Close the Closeable objects and <b>ignore</b> any Exception or
+ * null pointers.
+ * (This is the SLF4J equivalent of that in {@code IOUtils}).
+ * @param log the log to log at debug level. Can be null.
+ * @param closeables the objects to close
+ */
+ public static void closeAutocloseables(Logger log,
+ AutoCloseable... closeables) {
+ if (log == null) {
+ log = LOG;
+ }
+ for (AutoCloseable c : closeables) {
+ if (c != null) {
+ try {
+ log.debug("Closing {}", c);
+ c.close();
+ } catch (Exception e) {
+ log.debug("Exception in closing {}", c, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 9abb362..b237e85 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.net.URI;
+import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -37,9 +38,13 @@ public interface S3ClientFactory {
* Creates a new {@link AmazonS3} client.
*
* @param name raw input S3A file system URI
+ * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+ * @param credentialSet credentials to use
* @return S3 client
* @throws IOException IO problem
*/
- AmazonS3 createS3Client(URI name) throws IOException;
+ AmazonS3 createS3Client(URI name,
+ final String bucket,
+ final AWSCredentialsProvider credentialSet) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java
index fdaf9bd..e5a3639 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java
@@ -24,9 +24,11 @@ import java.net.URI;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -37,6 +39,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.security.UserGroupInformation;
@@ -77,17 +82,21 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
private final String arn;
+ private final AWSCredentialProviderList credentialsToSTS;
+
+ private final Invoker invoker;
+
/**
* Instantiate.
* This calls {@link #getCredentials()} to fail fast on the inner
* role credential retrieval.
- * @param uri URI of endpoint.
+ * @param fsUri URI of the filesystem.
* @param conf configuration
* @throws IOException on IO problems and some parameter checking
* @throws IllegalArgumentException invalid parameters
* @throws AWSSecurityTokenServiceException problems getting credentials
*/
- public AssumedRoleCredentialProvider(URI uri, Configuration conf)
+ public AssumedRoleCredentialProvider(URI fsUri, Configuration conf)
throws IOException {
arn = conf.getTrimmed(ASSUMED_ROLE_ARN, "");
@@ -99,13 +108,14 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
Class<?>[] awsClasses = loadAWSProviderClasses(conf,
ASSUMED_ROLE_CREDENTIALS_PROVIDER,
SimpleAWSCredentialsProvider.class);
- AWSCredentialProviderList credentials = new AWSCredentialProviderList();
+ credentialsToSTS = new AWSCredentialProviderList();
for (Class<?> aClass : awsClasses) {
if (this.getClass().equals(aClass)) {
throw new IOException(E_FORBIDDEN_PROVIDER);
}
- credentials.add(createAWSCredentialProvider(conf, aClass, uri));
+ credentialsToSTS.add(createAWSCredentialProvider(conf, aClass, fsUri));
}
+ LOG.debug("Credentials to obtain role credentials: {}", credentialsToSTS);
// then the STS binding
sessionName = conf.getTrimmed(ASSUMED_ROLE_SESSION_NAME,
@@ -122,14 +132,27 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
LOG.debug("Scope down policy {}", policy);
builder.withScopeDownPolicy(policy);
}
- String epr = conf.get(ASSUMED_ROLE_STS_ENDPOINT, "");
- if (StringUtils.isNotEmpty(epr)) {
- LOG.debug("STS Endpoint: {}", epr);
- builder.withServiceEndpoint(epr);
- }
- LOG.debug("Credentials to obtain role credentials: {}", credentials);
- builder.withLongLivedCredentialsProvider(credentials);
+ String endpoint = conf.get(ASSUMED_ROLE_STS_ENDPOINT, "");
+ String region = conf.get(ASSUMED_ROLE_STS_ENDPOINT_REGION,
+ ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT);
+ AWSSecurityTokenServiceClientBuilder stsbuilder =
+ STSClientFactory.builder(
+ conf,
+ fsUri.getHost(),
+ credentialsToSTS,
+ endpoint,
+ region);
+ // the STS client is not tracked for a shutdown in close(), because it
+ // (currently) throws an UnsupportedOperationException in shutdown().
+ builder.withStsClient(stsbuilder.build());
+
+ //now build the provider
stsProvider = builder.build();
+
+ // to handle STS throttling by the AWS account, we
+ // need to retry
+ invoker = new Invoker(new S3ARetryPolicy(conf), this::operationRetried);
+
// and force in a fail-fast check just to keep the stack traces less
// convoluted
getCredentials();
@@ -143,7 +166,17 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
@Override
public AWSCredentials getCredentials() {
try {
- return stsProvider.getCredentials();
+ return invoker.retryUntranslated("getCredentials",
+ true,
+ stsProvider::getCredentials);
+ } catch (IOException e) {
+ // this is in the signature of retryUntranslated;
+ // its hard to see how this could be raised, but for
+ // completeness, it is wrapped as an Amazon Client Exception
+ // and rethrown.
+ throw new AmazonClientException(
+ "getCredentials failed: " + e,
+ e);
} catch (AWSSecurityTokenServiceException e) {
LOG.error("Failed to get credentials for role {}",
arn, e);
@@ -161,7 +194,7 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
*/
@Override
public void close() {
- stsProvider.close();
+ S3AUtils.closeAutocloseables(LOG, stsProvider, credentialsToSTS);
}
@Override
@@ -205,4 +238,23 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
return r.toString();
}
+ /**
+ * Callback from {@link Invoker} when an operation is retried.
+ * @param text text of the operation
+ * @param ex exception
+ * @param retries number of retries
+ * @param idempotent is the method idempotent
+ */
+ public void operationRetried(
+ String text,
+ Exception ex,
+ int retries,
+ boolean idempotent) {
+ if (retries == 0) {
+ // log on the first retry attempt of the credential access.
+ // At worst, this means one log entry every intermittent renewal
+ // time.
+ LOG.info("Retried {}", text);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java
new file mode 100644
index 0000000..f48e17a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import com.amazonaws.AmazonClientException;
+
+/**
+ * A specific subclass of {@code AmazonClientException} which can
+ * be used in the retry logic to fail fast when there is any
+ * authentication problem.
+ */
+public class NoAuthWithAWSException extends AmazonClientException {
+
+ public NoAuthWithAWSException(final String message, final Throwable t) {
+ super(message, t);
+ }
+
+ public NoAuthWithAWSException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java
index ca2c993..d4568b0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java
@@ -205,6 +205,14 @@ public class RoleModel {
return new Policy(statements);
}
+ /**
+ * From a set of statements, create a policy.
+ * @param statements statements
+ * @return the policy
+ */
+ public static Policy policy(final List<RoleModel.Statement> statements) {
+ return new Policy(statements);
+ }
/**
* Effect options.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
index 6711eee..34ed295 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
@@ -29,6 +29,55 @@ public final class RolePolicies {
private RolePolicies() {
}
+ /** All KMS operations: {@value}.*/
+ public static final String KMS_ALL_OPERATIONS = "kms:*";
+
+ /** KMS encryption. This is <i>Not</i> used by SSE-KMS: {@value}. */
+ public static final String KMS_ENCRYPT = "kms:Encrypt";
+
+ /**
+ * Decrypt data encrypted with SSE-KMS: {@value}.
+ */
+ public static final String KMS_DECRYPT = "kms:Decrypt";
+
+ /**
+ * Arn for all KMS keys: {@value}.
+ */
+ public static final String KMS_ALL_KEYS = "arn:aws:kms:*";
+
+ /**
+ * This is used by S3 to generate a per-object encryption key and
+ * the encrypted value of this, the latter being what it tags
+ * the object with for later decryption: {@value}.
+ */
+ public static final String KMS_GENERATE_DATA_KEY = "kms:GenerateDataKey";
+
+ /**
+ * Actions needed to read and write SSE-KMS data.
+ */
+ private static final String[] KMS_KEY_RW =
+ new String[]{KMS_DECRYPT, KMS_GENERATE_DATA_KEY};
+
+ /**
+ * Actions needed to read SSE-KMS data.
+ */
+ private static final String[] KMS_KEY_READ =
+ new String[] {KMS_DECRYPT};
+
+ /**
+ * Statement to allow KMS R/W access access, so full use of
+ * SSE-KMS.
+ */
+ public static final Statement STATEMENT_ALLOW_SSE_KMS_RW =
+ statement(true, KMS_ALL_KEYS, KMS_KEY_RW);
+
+ /**
+ * Statement to allow read access to KMS keys, so the ability
+ * to read SSE-KMS data,, but not decrypt it.
+ */
+ public static final Statement STATEMENT_ALLOW_SSE_KMS_READ =
+ statement(true, KMS_ALL_KEYS, KMS_KEY_READ);
+
/**
* All S3 operations: {@value}.
*/
@@ -52,7 +101,6 @@ public final class RolePolicies {
public static final String S3_LIST_BUCKET_MULTPART_UPLOADS =
"s3:ListBucketMultipartUploads";
-
/**
* List multipart upload is needed for the S3A Commit protocols.
*/
@@ -97,6 +145,8 @@ public final class RolePolicies {
public static final String S3_GET_OBJECT_VERSION = "s3:GetObjectVersion";
+ public static final String S3_GET_BUCKET_LOCATION = "s3:GetBucketLocation";
+
public static final String S3_GET_OBJECT_VERSION_ACL
= "s3:GetObjectVersionAcl";
@@ -128,7 +178,8 @@ public final class RolePolicies {
public static final String S3_RESTORE_OBJECT = "s3:RestoreObject";
/**
- * Actions needed to read data from S3 through S3A.
+ * Actions needed to read a file in S3 through S3A, excluding
+ * S3Guard and SSE-KMS.
*/
public static final String[] S3_PATH_READ_OPERATIONS =
new String[]{
@@ -136,18 +187,20 @@ public final class RolePolicies {
};
/**
- * Actions needed to read data from S3 through S3A.
+ * Base actions needed to read data from S3 through S3A,
+ * excluding SSE-KMS data and S3Guard-ed buckets.
*/
public static final String[] S3_ROOT_READ_OPERATIONS =
new String[]{
S3_LIST_BUCKET,
S3_LIST_BUCKET_MULTPART_UPLOADS,
- S3_GET_OBJECT,
+ S3_ALL_GET,
};
/**
* Actions needed to write data to an S3A Path.
- * This includes the appropriate read operations.
+ * This includes the appropriate read operations, but
+ * not SSE-KMS or S3Guard support.
*/
public static final String[] S3_PATH_RW_OPERATIONS =
new String[]{
@@ -163,6 +216,7 @@ public final class RolePolicies {
* This is purely the extra operations needed for writing atop
* of the read operation set.
* Deny these and a path is still readable, but not writeable.
+ * Excludes: SSE-KMS and S3Guard permissions.
*/
public static final String[] S3_PATH_WRITE_OPERATIONS =
new String[]{
@@ -173,6 +227,7 @@ public final class RolePolicies {
/**
* Actions needed for R/W IO from the root of a bucket.
+ * Excludes: SSE-KMS and S3Guard permissions.
*/
public static final String[] S3_ROOT_RW_OPERATIONS =
new String[]{
@@ -190,26 +245,57 @@ public final class RolePolicies {
*/
public static final String DDB_ALL_OPERATIONS = "dynamodb:*";
- public static final String DDB_ADMIN = "dynamodb:*";
+ /**
+ * Operations needed for DDB/S3Guard Admin.
+ * For now: make this {@link #DDB_ALL_OPERATIONS}.
+ */
+ public static final String DDB_ADMIN = DDB_ALL_OPERATIONS;
+ /**
+ * Permission for DDB describeTable() operation: {@value}.
+ * This is used during initialization.
+ */
+ public static final String DDB_DESCRIBE_TABLE = "dynamodb:DescribeTable";
- public static final String DDB_BATCH_WRITE = "dynamodb:BatchWriteItem";
+ /**
+ * Permission to query the DDB table: {@value}.
+ */
+ public static final String DDB_QUERY = "dynamodb:Query";
/**
- * All DynamoDB tables: {@value}.
+ * Permission for DDB operation to get a record: {@value}.
*/
- public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:::*";
+ public static final String DDB_GET_ITEM = "dynamodb:GetItem";
+ /**
+ * Permission for DDB write record operation: {@value}.
+ */
+ public static final String DDB_PUT_ITEM = "dynamodb:PutItem";
+ /**
+ * Permission for DDB update single item operation: {@value}.
+ */
+ public static final String DDB_UPDATE_ITEM = "dynamodb:UpdateItem";
- public static final String WILDCARD = "*";
+ /**
+ * Permission for DDB delete operation: {@value}.
+ */
+ public static final String DDB_DELETE_ITEM = "dynamodb:DeleteItem";
/**
- * Allow all S3 Operations.
+ * Permission for DDB operation: {@value}.
*/
- public static final Statement STATEMENT_ALL_S3 = statement(true,
- S3_ALL_BUCKETS,
- S3_ALL_OPERATIONS);
+ public static final String DDB_BATCH_GET_ITEM = "dynamodb:BatchGetItem";
+
+ /**
+ * Batch write permission for DDB: {@value}.
+ */
+ public static final String DDB_BATCH_WRITE_ITEM = "dynamodb:BatchWriteItem";
+
+ /**
+ * All DynamoDB tables: {@value}.
+ */
+ public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:*";
/**
* Statement to allow all DDB access.
@@ -218,11 +304,36 @@ public final class RolePolicies {
ALL_DDB_TABLES, DDB_ALL_OPERATIONS);
/**
- * Allow all S3 and S3Guard operations.
+ * Statement to allow all client operations needed for S3Guard,
+ * but none of the admin operations.
+ */
+ public static final Statement STATEMENT_S3GUARD_CLIENT = statement(true,
+ ALL_DDB_TABLES,
+ DDB_BATCH_GET_ITEM,
+ DDB_BATCH_WRITE_ITEM,
+ DDB_DELETE_ITEM,
+ DDB_DESCRIBE_TABLE,
+ DDB_GET_ITEM,
+ DDB_PUT_ITEM,
+ DDB_QUERY,
+ DDB_UPDATE_ITEM
+ );
+
+ /**
+ * Allow all S3 Operations.
+ * This does not cover DDB or S3-KMS
+ */
+ public static final Statement STATEMENT_ALL_S3 = statement(true,
+ S3_ALL_BUCKETS,
+ S3_ALL_OPERATIONS);
+
+ /**
+ * Policy for all S3 and S3Guard operations, and SSE-KMS.
*/
public static final Policy ALLOW_S3_AND_SGUARD = policy(
STATEMENT_ALL_S3,
- STATEMENT_ALL_DDB
+ STATEMENT_ALL_DDB,
+ STATEMENT_ALLOW_SSE_KMS_RW
);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java
new file mode 100644
index 0000000..10bf88c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import java.io.IOException;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+
+/**
+ * Factory for creating STS Clients.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class STSClientFactory {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(STSClientFactory.class);
+
+ /**
+ * Create the builder ready for any final configuration options.
+ * Picks up connection settings from the Hadoop configuration, including
+ * proxy secrets.
+ * @param conf Configuration to act as source of options.
+ * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+ * @param credentials AWS credential chain to use
+ * @param stsEndpoint optional endpoint "https://sns.us-west-1.amazonaws.com"
+ * @param stsRegion the region, e.g "us-west-1"
+ * @return the builder to call {@code build()}
+ * @throws IOException problem reading proxy secrets
+ */
+ public static AWSSecurityTokenServiceClientBuilder builder(
+ final Configuration conf,
+ final String bucket,
+ final AWSCredentialsProvider credentials, final String stsEndpoint,
+ final String stsRegion) throws IOException {
+ Preconditions.checkArgument(credentials != null, "No credentials");
+ final AWSSecurityTokenServiceClientBuilder builder
+ = AWSSecurityTokenServiceClientBuilder.standard();
+ final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
+ builder.withClientConfiguration(awsConf);
+ builder.withCredentials(credentials);
+ if (StringUtils.isNotEmpty(stsEndpoint)) {
+ LOG.debug("STS Endpoint ={}", stsEndpoint);
+ builder.withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(stsEndpoint, stsRegion));
+ }
+ return builder;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
index 91e64cd..9e1d2f4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
@@ -34,10 +34,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+import org.apache.hadoop.fs.s3a.S3AUtils;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
-import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
/**
* Interface to create a DynamoDB client.
@@ -58,10 +57,14 @@ public interface DynamoDBClientFactory extends Configurable {
* it will indicate an error.
*
* @param defaultRegion the default region of the AmazonDynamoDB client
+ * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+ * @param credentials credentials to use for authentication.
* @return a new DynamoDB client
* @throws IOException if any IO error happens
*/
- AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException;
+ AmazonDynamoDB createDynamoDBClient(final String defaultRegion,
+ final String bucket,
+ final AWSCredentialsProvider credentials) throws IOException;
/**
* The default implementation for creating an AmazonDynamoDB.
@@ -69,16 +72,15 @@ public interface DynamoDBClientFactory extends Configurable {
class DefaultDynamoDBClientFactory extends Configured
implements DynamoDBClientFactory {
@Override
- public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
+ public AmazonDynamoDB createDynamoDBClient(String defaultRegion,
+ final String bucket,
+ final AWSCredentialsProvider credentials)
throws IOException {
Preconditions.checkNotNull(getConf(),
"Should have been configured before usage");
final Configuration conf = getConf();
- final AWSCredentialsProvider credentials =
- createAWSCredentialProviderSet(null, conf);
- final ClientConfiguration awsConf =
- DefaultS3ClientFactory.createAwsConf(conf);
+ final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
final String region = getRegion(conf, defaultRegion);
LOG.debug("Creating DynamoDB client in region {}", region);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 43849b1..ba80b88 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
+import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
@@ -67,6 +69,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
@@ -75,13 +78,14 @@ import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Tristate;
+import org.apache.hadoop.fs.s3a.auth.RolePolicies;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
@@ -207,6 +211,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
new ValueMap().withBoolean(":false", false);
private DynamoDB dynamoDB;
+ private AWSCredentialProviderList credentials;
private String region;
private Table table;
private String tableName;
@@ -242,10 +247,16 @@ public class DynamoDBMetadataStore implements MetadataStore {
* A utility function to create DynamoDB instance.
* @param conf the file system configuration
* @param s3Region region of the associated S3 bucket (if any).
+ * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+ * @param credentials credentials.
* @return DynamoDB instance.
* @throws IOException I/O error.
*/
- private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
+ private static DynamoDB createDynamoDB(
+ final Configuration conf,
+ final String s3Region,
+ final String bucket,
+ final AWSCredentialsProvider credentials)
throws IOException {
Preconditions.checkNotNull(conf);
final Class<? extends DynamoDBClientFactory> cls = conf.getClass(
@@ -254,10 +265,18 @@ public class DynamoDBMetadataStore implements MetadataStore {
DynamoDBClientFactory.class);
LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region);
final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
- .createDynamoDBClient(s3Region);
+ .createDynamoDBClient(s3Region, bucket, credentials);
return new DynamoDB(dynamoDBClient);
}
+ /**
+ * {@inheritDoc}.
+ * The credentials for authenticating with S3 are requested from the
+ * FS via {@link S3AFileSystem#shareCredentials(String)}; this will
+ * increment the reference counter of these credentials.
+ * @param fs {@code S3AFileSystem} associated with the MetadataStore
+ * @throws IOException on a failure
+ */
@Override
@Retries.OnceRaw
public void initialize(FileSystem fs) throws IOException {
@@ -274,11 +293,23 @@ public class DynamoDBMetadataStore implements MetadataStore {
LOG.debug("Overriding S3 region with configured DynamoDB region: {}",
region);
} else {
- region = owner.getBucketLocation();
+ try {
+ region = owner.getBucketLocation();
+ } catch (AccessDeniedException e) {
+ // access denied here == can't call getBucket. Report meaningfully
+ URI uri = owner.getUri();
+ LOG.error("Failed to get bucket location from S3 bucket {}",
+ uri);
+ throw (IOException)new AccessDeniedException(
+ "S3 client role lacks permission "
+ + RolePolicies.S3_GET_BUCKET_LOCATION + " for " + uri)
+ .initCause(e);
+ }
LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
}
username = owner.getUsername();
- dynamoDB = createDynamoDB(conf, region);
+ credentials = owner.shareCredentials("s3guard");
+ dynamoDB = createDynamoDB(conf, region, bucket, credentials);
// use the bucket as the DynamoDB table name if not specified in config
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
@@ -311,6 +342,9 @@ public class DynamoDBMetadataStore implements MetadataStore {
* must declare the table name and region in the
* {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and
* {@link Constants#S3GUARD_DDB_REGION_KEY} respectively.
+ * It also creates a new credential provider list from the configuration,
+ * using the base fs.s3a.* options, as there is no bucket to infer per-bucket
+ * settings from.
*
* @see #initialize(FileSystem)
* @throws IOException if there is an error
@@ -327,7 +361,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
Preconditions.checkArgument(!StringUtils.isEmpty(region),
"No DynamoDB region configured");
- dynamoDB = createDynamoDB(conf, region);
+ credentials = createAWSCredentialProviderSet(null, conf);
+ dynamoDB = createDynamoDB(conf, region, null, credentials);
username = UserGroupInformation.getCurrentUser().getShortUserName();
initDataAccessRetries(conf);
@@ -778,12 +813,17 @@ public class DynamoDBMetadataStore implements MetadataStore {
if (instrumentation != null) {
instrumentation.storeClosed();
}
- if (dynamoDB != null) {
- LOG.debug("Shutting down {}", this);
- dynamoDB.shutdown();
- dynamoDB = null;
+ try {
+ if (dynamoDB != null) {
+ LOG.debug("Shutting down {}", this);
+ dynamoDB.shutdown();
+ dynamoDB = null;
+ }
+ } finally {
+ closeAutocloseables(LOG, credentials);
+ credentials = null;
}
- }
+}
@Override
@Retries.OnceTranslated
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org