You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/08/09 05:57:55 UTC

[1/2] hadoop git commit: HADOOP-15583. Stabilize S3A Assumed Role support. Contributed by Steve Loughran.

Repository: hadoop
Updated Branches:
  refs/heads/trunk d81cd3611 -> da9a39eed


http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md
index 3afd63f..8af0457 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md
@@ -29,7 +29,7 @@ assumed roles for different buckets.
 *IAM Assumed Roles are unlikely to be supported by third-party systems
 supporting the S3 APIs.*
 
-## Using IAM Assumed Roles
+## <a name="using_assumed_roles"></a> Using IAM Assumed Roles
 
 ### Before You Begin
 
@@ -40,6 +40,8 @@ are, how to configure their policies, etc.
 * You need a pair of long-lived IAM User credentials, not the root account set.
 * Have the AWS CLI installed, and test that it works there.
 * Give the role access to S3, and, if using S3Guard, to DynamoDB.
+* For working with data encrypted with SSE-KMS, the role must
+have access to the appropriate KMS keys.
 
 Trying to learn how IAM Assumed Roles work by debugging stack traces from
 the S3A client is "suboptimal".
@@ -51,7 +53,7 @@ To use assumed roles, the client must be configured to use the
 in the configuration option `fs.s3a.aws.credentials.provider`.
 
 This AWS Credential provider will read in the `fs.s3a.assumed.role` options needed to connect to the
-Session Token Service [Assumed Role API](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html),
+Security Token Service [Assumed Role API](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html),
 first authenticating with the full credentials, then assuming the specific role
 specified. It will then refresh this login at the configured rate of
 `fs.s3a.assumed.role.session.duration`
@@ -69,7 +71,7 @@ which uses `fs.s3a.access.key` and `fs.s3a.secret.key`.
 Note: although you can list other AWS credential providers in  to the
 Assumed Role Credential Provider, it can only cause confusion.
 
-### <a name="using"></a> Using Assumed Roles
+### <a name="using"></a> Configuring Assumed Roles
 
 To use assumed roles, the S3A client credentials provider must be set to
 the `AssumedRoleCredentialProvider`, and `fs.s3a.assumed.role.arn` to
@@ -78,7 +80,6 @@ the previously created ARN.
 ```xml
 <property>
   <name>fs.s3a.aws.credentials.provider</name>
-  <value>org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider</value>
   <value>org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider</value>
 </property>
 
@@ -159,7 +160,18 @@ Here are the full set of configuration options.
   <name>fs.s3a.assumed.role.sts.endpoint</name>
   <value/>
   <description>
-    AWS Simple Token Service Endpoint. If unset, uses the default endpoint.
+    AWS Security Token Service Endpoint. If unset, uses the default endpoint.
+    Only used if AssumedRoleCredentialProvider is the AWS credential provider.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.assumed.role.sts.endpoint.region</name>
+  <value>us-west-1</value>
+  <description>
+    AWS Security Token Service Endpoint's region;
+    Needed if fs.s3a.assumed.role.sts.endpoint points to an endpoint
+    other than the default one and the v4 signature is used.
     Only used if AssumedRoleCredentialProvider is the AWS credential provider.
   </description>
 </property>
@@ -194,39 +206,101 @@ These lists represent the minimum actions to which the client's principal
 must have in order to work with a bucket.
 
 
-### Read Access Permissions
+### <a name="read-permissions"></a> Read Access Permissions
 
 Permissions which must be granted when reading from a bucket:
 
 
-| Action | S3A operations |
-|--------|----------|
-| `s3:ListBucket` | `listStatus()`, `getFileStatus()` and elsewhere |
-| `s3:GetObject` | `getFileStatus()`, `open()` and elsewhere |
-| `s3:ListBucketMultipartUploads` |  Aborting/cleaning up S3A commit operations|
+```
+s3:Get*
+s3:ListBucket
+```
+
+When using S3Guard, the client needs the appropriate
+<a href="s3guard-permissions">DynamoDB access permissions</a>
+
+To use SSE-KMS encryption, the client needs the
+<a href="sse-kms-permissions">SSE-KMS Permissions</a> to access the
+KMS key(s).
+
+### <a name="write-permissions"></a> Write Access Permissions
+
+These permissions must all be granted for write access:
+
+```
+s3:Get*
+s3:Delete*
+s3:Put*
+s3:ListBucket
+s3:ListBucketMultipartUploads
+s3:AbortMultipartUpload
+```
+
+### <a name="sse-kms-permissions"></a> SSE-KMS Permissions
+
+When to read data encrypted using SSE-KMS, the client must have
+ `kms:Decrypt` permission for the specific key a file was encrypted with.
+
+```
+kms:Decrypt
+```
+
+To write data using SSE-KMS, the client must have all the following permissions.
+
+```
+kms:Decrypt
+kms:GenerateDataKey
+```
 
+This includes renaming: renamed files are encrypted with the encryption key
+of the current S3A client; it must decrypt the source file first.
 
-The `s3:ListBucketMultipartUploads` is only needed when committing work
-via the [S3A committers](committers.html).
-However, it must be granted to the root path in order to safely clean up jobs.
-It is simplest to permit this in all buckets, even if it is only actually
-needed when writing data.
+If the caller doesn't have these permissions, the operation will fail with an
+`AccessDeniedException`: the S3 Store does not provide the specifics of
+the cause of the failure.
 
+### <a name="s3guard-permissions"></a> S3Guard Permissions
 
-### Write Access Permissions
+To use S3Guard, all clients must have a subset of the
+[AWS DynamoDB Permissions](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/api-permissions-reference.html).
 
-These permissions must *also* be granted for write access:
+To work with buckets protected with S3Guard, the client must have
+all the following rights on the DynamoDB Table used to protect that bucket.
 
+```
+dynamodb:BatchGetItem
+dynamodb:BatchWriteItem
+dynamodb:DeleteItem
+dynamodb:DescribeTable
+dynamodb:GetItem
+dynamodb:PutItem
+dynamodb:Query
+dynamodb:UpdateItem
+```
 
-| Action | S3A operations |
-|--------|----------|
-| `s3:PutObject` | `mkdir()`, `create()`, `rename()`, `delete()` |
-| `s3:DeleteObject` | `mkdir()`, `create()`, `rename()`, `delete()` |
-| `s3:AbortMultipartUpload` | S3A committer `abortJob()` and `cleanup()` operations |
-| `s3:ListMultipartUploadParts` | S3A committer `abortJob()` and `cleanup()` operations |
+This is true, *even if the client only has read access to the data*.
 
+For the `hadoop s3guard` table management commands, _extra_ permissions are required:
 
-### Mixed Permissions in a single S3 Bucket
+```
+dynamodb:CreateTable
+dynamodb:DescribeLimits
+dynamodb:DeleteTable
+dynamodb:Scan
+dynamodb:TagResource
+dynamodb:UntagResource
+dynamodb:UpdateTable
+```
+
+Without these permissions, tables cannot be created, destroyed or have their IO capacity
+changed through the `s3guard set-capacity` call.
+The `dynamodb:Scan` permission is needed for `s3guard prune`
+
+The `dynamodb:CreateTable` permission is needed by a client it tries to
+create the DynamoDB table on startup, that is
+`fs.s3a.s3guard.ddb.table.create` is `true` and the table does not already exist.
+
+### <a name="mixed-permissions"></a> Mixed Permissions in a single S3 Bucket
 
 Mixing permissions down the "directory tree" is limited
 only to the extent of supporting writeable directories under
@@ -274,7 +348,7 @@ This example has the base bucket read only, and a directory underneath,
     "Action" : [
       "s3:ListBucket",
       "s3:ListBucketMultipartUploads",
-      "s3:GetObject"
+      "s3:Get*"
       ],
     "Resource" : "arn:aws:s3:::example-bucket/*"
   }, {
@@ -320,7 +394,7 @@ the command line before trying to use the S3A client.
 `hadoop fs -mkdirs -p s3a://bucket/path/p1/`
 
 
-### <a name="no_role"></a>IOException: "Unset property fs.s3a.assumed.role.arn"
+### <a name="no_role"></a> IOException: "Unset property fs.s3a.assumed.role.arn"
 
 The Assumed Role Credential Provider is enabled, but `fs.s3a.assumed.role.arn` is unset.
 
@@ -339,7 +413,7 @@ java.io.IOException: Unset property fs.s3a.assumed.role.arn
   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:474)
 ```
 
-### <a name="not_authorized_for_assumed_role"></a>"Not authorized to perform sts:AssumeRole"
+### <a name="not_authorized_for_assumed_role"></a> "Not authorized to perform sts:AssumeRole"
 
 This can arise if the role ARN set in `fs.s3a.assumed.role.arn` is invalid
 or one to which the caller has no access.
@@ -399,7 +473,8 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
 The value of `fs.s3a.assumed.role.session.duration` is out of range.
 
 ```
-java.lang.IllegalArgumentException: Assume Role session duration should be in the range of 15min - 1Hr
+java.lang.IllegalArgumentException: Assume Role session duration should be in the range of 15min
+- 1Hr
   at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$Builder.withRoleSessionDurationSeconds(STSAssumeRoleSessionCredentialsProvider.java:437)
   at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:86)
 ```
@@ -603,7 +678,7 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
 
 ### <a name="invalid_token"></a> `AccessDeniedException/InvalidClientTokenId`: "The security token included in the request is invalid"
 
-The credentials used to authenticate with the AWS Simple Token Service are invalid.
+The credentials used to authenticate with the AWS Security Token Service are invalid.
 
 ```
 [ERROR] Failures:
@@ -682,26 +757,7 @@ org.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.f
   at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3354)
   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:474)
   at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
-  at org.apache.hadoop.fs.s3a.ITestAssumeRole.lambda$expectFileSystemFailure$0(ITestAssumeRole.java:70)
-  at org.apache.hadoop.fs.s3a.ITestAssumeRole.lambda$interceptC$1(ITestAssumeRole.java:84)
-  at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:491)
-  at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:377)
-  at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:446)
-  at org.apache.hadoop.fs.s3a.ITestAssumeRole.interceptC(ITestAssumeRole.java:82)
-  at org.apache.hadoop.fs.s3a.ITestAssumeRole.expectFileSystemFailure(ITestAssumeRole.java:68)
-  at org.apache.hadoop.fs.s3a.ITestAssumeRole.testAssumeRoleBadSession(ITestAssumeRole.java:216)
-  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
-  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
-  at java.lang.reflect.Method.invoke(Method.java:498)
-  at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
-  at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
-  at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
-  at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
-  at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
-  at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
-  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
-  at org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)
+
 Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException:
     1 validation error detected: Value 'Session Names cannot Hava Spaces!' at 'roleSessionName'
     failed to satisfy constraint:
@@ -742,10 +798,11 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
 ### <a name="access_denied"></a> `java.nio.file.AccessDeniedException` within a FileSystem API call
 
 If an operation fails with an `AccessDeniedException`, then the role does not have
-the permission for the S3 Operation invoked during the call
+the permission for the S3 Operation invoked during the call.
 
 ```
-java.nio.file.AccessDeniedException: s3a://bucket/readonlyDir: rename(s3a://bucket/readonlyDir, s3a://bucket/renameDest)
+java.nio.file.AccessDeniedException: s3a://bucket/readonlyDir:
+  rename(s3a://bucket/readonlyDir, s3a://bucket/renameDest)
  on s3a://bucket/readonlyDir:
   com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied
   (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 2805F2ABF5246BB1;
@@ -795,3 +852,33 @@ check the path for the operation.
 Make sure that all the read and write permissions are allowed for any bucket/path
 to which data is being written to, and read permissions for all
 buckets read from.
+
+If the bucket is using SSE-KMS to encrypt data:
+
+1. The caller must have the `kms:Decrypt` permission to read the data.
+1. The caller needs `kms:Decrypt` and `kms:GenerateDataKey`.
+
+Without permissions, the request fails *and there is no explicit message indicating
+that this is an encryption-key issue*.
+
+### <a name="dynamodb_exception"></a> `AccessDeniedException` + `AmazonDynamoDBException`
+
+```
+java.nio.file.AccessDeniedException: bucket1:
+  com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException:
+  User: arn:aws:sts::980678866538:assumed-role/s3guard-test-role/test is not authorized to perform:
+  dynamodb:DescribeTable on resource: arn:aws:dynamodb:us-west-1:980678866538:table/bucket1
+   (Service: AmazonDynamoDBv2; Status Code: 400;
+```
+
+The caller is trying to access an S3 bucket which uses S3Guard, but the caller
+lacks the relevant DynamoDB access permissions.
+
+The `dynamodb:DescribeTable` operation is the first one used in S3Guard to access,
+the DynamoDB table, so it is often the first to fail. It can be a sign
+that the role has no permissions at all to access the table named in the exception,
+or just that this specific permission has been omitted.
+
+If the role policy requested for the assumed role didn't ask for any DynamoDB
+permissions, this is where all attempts to work with a S3Guarded bucket will
+fail. Check the value of `fs.s3a.assumed.role.policy`

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 7d0f67b..2dee10a 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -33,7 +33,7 @@ See also:
 * [Working with IAM Assumed Roles](./assumed_roles.html)
 * [Testing](./testing.html)
 
-##<a name="overview"></a> Overview
+## <a name="overview"></a> Overview
 
 Apache Hadoop's `hadoop-aws` module provides support for AWS integration.
 applications to easily use this support.
@@ -88,7 +88,7 @@ maintain it.
    This connector is no longer available: users must migrate to the newer `s3a:` client.
 
 
-##<a name="getting_started"></a> Getting Started
+## <a name="getting_started"></a> Getting Started
 
 S3A depends upon two JARs, alongside `hadoop-common` and its dependencies.
 
@@ -1698,6 +1698,6 @@ as configured by the value `fs.s3a.multipart.size`.
 To disable checksum verification in `distcp`, use the `-skipcrccheck` option:
 
 ```bash
-hadoop distcp -update -skipcrccheck /user/alice/datasets s3a://alice-backup/datasets
+hadoop distcp -update -skipcrccheck -numListstatusThreads 40 /user/alice/datasets s3a://alice-backup/datasets
 ```
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index aa6b5d8..3214c76 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -36,14 +36,6 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 import java.io.File;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
@@ -60,6 +52,9 @@ import org.junit.rules.TemporaryFolder;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME;
+import static org.junit.Assert.*;
 
 /**
  * S3A tests for configuration.
@@ -134,12 +129,26 @@ public class ITestS3AConfiguration {
     conf.setInt(Constants.PROXY_PORT, 1);
     String proxy =
         conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT);
-    try {
-      fs = S3ATestUtils.createTestFileSystem(conf);
-      fail("Expected a connection error for proxy server at " + proxy);
-    } catch (AWSClientIOException e) {
-      // expected
-    }
+    expectFSCreateFailure(AWSClientIOException.class,
+        conf, "when using proxy " + proxy);
+  }
+
+  /**
+   * Expect a filesystem to not be created from a configuration
+   * @return the exception intercepted
+   * @throws Exception any other exception
+   */
+  private <E extends Throwable> E expectFSCreateFailure(
+      Class<E> clazz,
+      Configuration conf,
+      String text)
+      throws Exception {
+
+    return intercept(clazz,
+        () -> {
+          fs = S3ATestUtils.createTestFileSystem(conf);
+          return "expected failure creating FS " + text + " got " + fs;
+        });
   }
 
   @Test
@@ -148,15 +157,13 @@ public class ITestS3AConfiguration {
     conf.unset(Constants.PROXY_HOST);
     conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
     conf.setInt(Constants.PROXY_PORT, 1);
-    try {
-      fs = S3ATestUtils.createTestFileSystem(conf);
-      fail("Expected a proxy configuration error");
-    } catch (IllegalArgumentException e) {
-      String msg = e.toString();
-      if (!msg.contains(Constants.PROXY_HOST) &&
-          !msg.contains(Constants.PROXY_PORT)) {
-        throw e;
-      }
+    IllegalArgumentException e = expectFSCreateFailure(
+        IllegalArgumentException.class,
+        conf, "Expected a connection error for proxy server");
+    String msg = e.toString();
+    if (!msg.contains(Constants.PROXY_HOST) &&
+        !msg.contains(Constants.PROXY_PORT)) {
+      throw e;
     }
   }
 
@@ -167,19 +174,11 @@ public class ITestS3AConfiguration {
     conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
     conf.set(Constants.PROXY_HOST, "127.0.0.1");
     conf.set(Constants.SECURE_CONNECTIONS, "true");
-    try {
-      fs = S3ATestUtils.createTestFileSystem(conf);
-      fail("Expected a connection error for proxy server");
-    } catch (AWSClientIOException e) {
-      // expected
-    }
+    expectFSCreateFailure(AWSClientIOException.class,
+        conf, "Expected a connection error for proxy server");
     conf.set(Constants.SECURE_CONNECTIONS, "false");
-    try {
-      fs = S3ATestUtils.createTestFileSystem(conf);
-      fail("Expected a connection error for proxy server");
-    } catch (AWSClientIOException e) {
-      // expected
-    }
+    expectFSCreateFailure(AWSClientIOException.class,
+        conf, "Expected a connection error for proxy server");
   }
 
   @Test
@@ -189,31 +188,31 @@ public class ITestS3AConfiguration {
     conf.set(Constants.PROXY_HOST, "127.0.0.1");
     conf.setInt(Constants.PROXY_PORT, 1);
     conf.set(Constants.PROXY_USERNAME, "user");
-    try {
-      fs = S3ATestUtils.createTestFileSystem(conf);
-      fail("Expected a connection error for proxy server");
-    } catch (IllegalArgumentException e) {
-      String msg = e.toString();
-      if (!msg.contains(Constants.PROXY_USERNAME) &&
-          !msg.contains(Constants.PROXY_PASSWORD)) {
-        throw e;
-      }
+    IllegalArgumentException e = expectFSCreateFailure(
+        IllegalArgumentException.class,
+        conf, "Expected a connection error for proxy server");
+    assertIsProxyUsernameError(e);
+  }
+
+  private void assertIsProxyUsernameError(final IllegalArgumentException e) {
+    String msg = e.toString();
+    if (!msg.contains(Constants.PROXY_USERNAME) &&
+        !msg.contains(Constants.PROXY_PASSWORD)) {
+      throw e;
     }
+  }
+
+  @Test
+  public void testUsernameInconsistentWithPassword2() throws Exception {
     conf = new Configuration();
     conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
     conf.set(Constants.PROXY_HOST, "127.0.0.1");
     conf.setInt(Constants.PROXY_PORT, 1);
     conf.set(Constants.PROXY_PASSWORD, "password");
-    try {
-      fs = S3ATestUtils.createTestFileSystem(conf);
-      fail("Expected a connection error for proxy server");
-    } catch (IllegalArgumentException e) {
-      String msg = e.toString();
-      if (!msg.contains(Constants.PROXY_USERNAME) &&
-          !msg.contains(Constants.PROXY_PASSWORD)) {
-        throw e;
-      }
-    }
+    IllegalArgumentException e = expectFSCreateFailure(
+        IllegalArgumentException.class,
+        conf, "Expected a connection error for proxy server");
+    assertIsProxyUsernameError(e);
   }
 
   @Test
@@ -393,7 +392,7 @@ public class ITestS3AConfiguration {
       // Catch/pass standard path style access behaviour when live bucket
       // isn't in the same region as the s3 client default. See
       // http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
-      assertEquals(e.getStatusCode(), HttpStatus.SC_MOVED_PERMANENTLY);
+      assertEquals(HttpStatus.SC_MOVED_PERMANENTLY, e.getStatusCode());
     }
   }
 
@@ -428,8 +427,16 @@ public class ITestS3AConfiguration {
   public void testCloseIdempotent() throws Throwable {
     conf = new Configuration();
     fs = S3ATestUtils.createTestFileSystem(conf);
+    AWSCredentialProviderList credentials =
+        fs.shareCredentials("testCloseIdempotent");
+    credentials.close();
     fs.close();
+    assertTrue("Closing FS didn't close credentials " + credentials,
+        credentials.isClosed());
+    assertEquals("refcount not zero in " + credentials, 0, credentials.getRefCount());
     fs.close();
+    // and the numbers should not change
+    assertEquals("refcount not zero in " + credentials, 0, credentials.getRefCount());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
index 44a2beb..afc4086 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
@@ -19,15 +19,14 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.IOException;
-import java.net.URI;
 
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest;
 import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
 import com.amazonaws.services.securitytoken.model.Credentials;
 
-import org.apache.hadoop.fs.s3native.S3xLoginHelper;
+import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.test.LambdaTestUtils;
 
@@ -55,6 +54,14 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase {
 
   private static final long TEST_FILE_SIZE = 1024;
 
+  private AWSCredentialProviderList credentials;
+
+  @Override
+  public void teardown() throws Exception {
+    S3AUtils.closeAutocloseables(LOG, credentials);
+    super.teardown();
+  }
+
   /**
    * Test use of STS for requesting temporary credentials.
    *
@@ -63,7 +70,7 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase {
    * S3A tests to request temporary credentials, then attempt to use those
    * credentials instead.
    *
-   * @throws IOException
+   * @throws IOException failure
    */
   @Test
   public void testSTS() throws IOException {
@@ -71,21 +78,20 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase {
     if (!conf.getBoolean(TEST_STS_ENABLED, true)) {
       skip("STS functional tests disabled");
     }
-
-    S3xLoginHelper.Login login = S3AUtils.getAWSAccessKeys(
-        URI.create("s3a://foobar"), conf);
-    if (!login.hasLogin()) {
-      skip("testSTS disabled because AWS credentials not configured");
-    }
-    AWSCredentialsProvider parentCredentials = new BasicAWSCredentialsProvider(
-        login.getUser(), login.getPassword());
-
-    String stsEndpoint = conf.getTrimmed(TEST_STS_ENDPOINT, "");
-    AWSSecurityTokenServiceClient stsClient;
-    stsClient = new AWSSecurityTokenServiceClient(parentCredentials);
-    if (!stsEndpoint.isEmpty()) {
-      LOG.debug("STS Endpoint ={}", stsEndpoint);
-      stsClient.setEndpoint(stsEndpoint);
+    S3AFileSystem testFS = getFileSystem();
+    credentials = testFS.shareCredentials("testSTS");
+
+    String bucket = testFS.getBucket();
+    AWSSecurityTokenServiceClientBuilder builder = STSClientFactory.builder(
+        conf,
+        bucket,
+        credentials,
+        conf.getTrimmed(TEST_STS_ENDPOINT, ""), "");
+    AWSSecurityTokenService stsClient = builder.build();
+
+    if (!conf.getTrimmed(TEST_STS_ENDPOINT, "").isEmpty()) {
+      LOG.debug("STS Endpoint ={}", conf.getTrimmed(TEST_STS_ENDPOINT, ""));
+      stsClient.setEndpoint(conf.getTrimmed(TEST_STS_ENDPOINT, ""));
     }
     GetSessionTokenRequest sessionTokenRequest = new GetSessionTokenRequest();
     sessionTokenRequest.setDurationSeconds(900);
@@ -93,23 +99,28 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase {
     sessionTokenResult = stsClient.getSessionToken(sessionTokenRequest);
     Credentials sessionCreds = sessionTokenResult.getCredentials();
 
-    String childAccessKey = sessionCreds.getAccessKeyId();
-    conf.set(ACCESS_KEY, childAccessKey);
-    String childSecretKey = sessionCreds.getSecretAccessKey();
-    conf.set(SECRET_KEY, childSecretKey);
-    String sessionToken = sessionCreds.getSessionToken();
-    conf.set(SESSION_TOKEN, sessionToken);
+    // clone configuration so changes here do not affect the base FS.
+    Configuration conf2 = new Configuration(conf);
+    S3AUtils.clearBucketOption(conf2, bucket, AWS_CREDENTIALS_PROVIDER);
+    S3AUtils.clearBucketOption(conf2, bucket, ACCESS_KEY);
+    S3AUtils.clearBucketOption(conf2, bucket, SECRET_KEY);
+    S3AUtils.clearBucketOption(conf2, bucket, SESSION_TOKEN);
+
+    conf2.set(ACCESS_KEY, sessionCreds.getAccessKeyId());
+    conf2.set(SECRET_KEY, sessionCreds.getSecretAccessKey());
+    conf2.set(SESSION_TOKEN, sessionCreds.getSessionToken());
 
-    conf.set(AWS_CREDENTIALS_PROVIDER, PROVIDER_CLASS);
+    conf2.set(AWS_CREDENTIALS_PROVIDER, PROVIDER_CLASS);
 
-    try(S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+    // with valid credentials, we can set properties.
+    try(S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf2)) {
       createAndVerifyFile(fs, path("testSTS"), TEST_FILE_SIZE);
     }
 
     // now create an invalid set of credentials by changing the session
     // token
-    conf.set(SESSION_TOKEN, "invalid-" + sessionToken);
-    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+    conf2.set(SESSION_TOKEN, "invalid-" + sessionCreds.getSessionToken());
+    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf2)) {
       createAndVerifyFile(fs, path("testSTSInvalidToken"), TEST_FILE_SIZE);
       fail("Expected an access exception, but file access to "
           + fs.getUri() + " was allowed: " + fs);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 763819b..a1df1a5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
+
 import org.junit.Assume;
 import org.junit.Test;
 
@@ -37,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
@@ -71,7 +74,9 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     // Other configs would break test assumptions
     conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
     conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
-    conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC);
+    // this is a long value to guarantee that the inconsistency holds
+    // even over long-haul connections, and in the debugger too/
+    conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, (long) (60 * 1000));
     return new S3AContract(conf);
   }
 
@@ -524,37 +529,60 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
 
     ListObjectsV2Result postDeleteDelimited = listObjectsV2(fs, key, "/");
     ListObjectsV2Result postDeleteUndelimited = listObjectsV2(fs, key, null);
-
-    assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
+    assertListSizeEqual(
+        "InconsistentAmazonS3Client added back objects incorrectly " +
             "in a non-recursive listing",
-        preDeleteDelimited.getObjectSummaries().size(),
-        postDeleteDelimited.getObjectSummaries().size()
-    );
-    assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
+        preDeleteDelimited.getObjectSummaries(),
+        postDeleteDelimited.getObjectSummaries());
+
+    assertListSizeEqual("InconsistentAmazonS3Client added back prefixes incorrectly " +
             "in a non-recursive listing",
-        preDeleteDelimited.getCommonPrefixes().size(),
-        postDeleteDelimited.getCommonPrefixes().size()
+        preDeleteDelimited.getCommonPrefixes(),
+        postDeleteDelimited.getCommonPrefixes()
     );
-    assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
+    assertListSizeEqual("InconsistentAmazonS3Client added back objects incorrectly " +
             "in a recursive listing",
-        preDeleteUndelimited.getObjectSummaries().size(),
-        postDeleteUndelimited.getObjectSummaries().size()
+        preDeleteUndelimited.getObjectSummaries(),
+        postDeleteUndelimited.getObjectSummaries()
     );
-    assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
+
+    assertListSizeEqual("InconsistentAmazonS3Client added back prefixes incorrectly " +
             "in a recursive listing",
-        preDeleteUndelimited.getCommonPrefixes().size(),
-        postDeleteUndelimited.getCommonPrefixes().size()
+        preDeleteUndelimited.getCommonPrefixes(),
+        postDeleteUndelimited.getCommonPrefixes()
     );
   }
 
   /**
-   * retrying v2 list.
-   * @param fs
-   * @param key
-   * @param delimiter
-   * @return
+   * Assert that the two list sizes match; failure message includes the lists.
+   * @param message text for the assertion
+   * @param expected expected list
+   * @param actual actual list
+   * @param <T> type of list
+   */
+  private <T> void assertListSizeEqual(String message,
+      List<T> expected,
+      List<T> actual) {
+    String leftContents = expected.stream()
+        .map(n -> n.toString())
+        .collect(Collectors.joining("\n"));
+    String rightContents = actual.stream()
+        .map(n -> n.toString())
+        .collect(Collectors.joining("\n"));
+    String summary = "\nExpected:" + leftContents
+        + "\n-----------\nActual:" + rightContents;
+    assertEquals(message + summary, expected.size(), actual.size());
+  }
+
+  /**
+   * Retrying v2 list directly through the s3 client.
+   * @param fs filesystem
+   * @param key key to list under
+   * @param delimiter any delimiter
+   * @return the listing
    * @throws IOException on error
    */
+  @Retries.RetryRaw
   private ListObjectsV2Result listObjectsV2(S3AFileSystem fs,
       String key, String delimiter) throws IOException {
     ListObjectsV2Request k = fs.createListObjectsRequest(key, delimiter)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
index c8a54b8..d5cd4d4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
@@ -65,11 +65,12 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
     // delete the existing directory (in case of last test failure)
     noS3Guard.delete(directory, true);
     // Create a directory on S3 only
-    noS3Guard.mkdirs(new Path(directory, "OnS3"));
+    Path onS3 = new Path(directory, "OnS3");
+    noS3Guard.mkdirs(onS3);
     // Create a directory on both S3 and metadata store
-    Path p = new Path(directory, "OnS3AndMS");
-    ContractTestUtils.assertPathDoesNotExist(noWriteBack, "path", p);
-    noWriteBack.mkdirs(p);
+    Path onS3AndMS = new Path(directory, "OnS3AndMS");
+    ContractTestUtils.assertPathDoesNotExist(noWriteBack, "path", onS3AndMS);
+    noWriteBack.mkdirs(onS3AndMS);
 
     FileStatus[] fsResults;
     DirListingMetadata mdResults;
@@ -83,6 +84,8 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
     // Metadata store without write-back should still only contain /OnS3AndMS,
     // because newly discovered /OnS3 is not written back to metadata store
     mdResults = noWriteBack.getMetadataStore().listChildren(directory);
+    assertNotNull("No results from noWriteBack listChildren " + directory,
+        mdResults);
     assertEquals("Metadata store without write back should still only know "
             + "about /OnS3AndMS, but it has: " + mdResults,
         1, mdResults.numEntries());
@@ -102,8 +105,7 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
 
     // If we don't clean this up, the next test run will fail because it will
     // have recorded /OnS3 being deleted even after it's written to noS3Guard.
-    getFileSystem().getMetadataStore().forgetMetadata(
-        new Path(directory, "OnS3"));
+    getFileSystem().getMetadataStore().forgetMetadata(onS3);
   }
 
   /**
@@ -118,26 +120,33 @@ public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
 
     // Create a FileSystem that is S3-backed only
     conf = createConfiguration();
-    S3ATestUtils.disableFilesystemCaching(conf);
     String host = fsURI.getHost();
-    if (disableS3Guard) {
-      conf.set(Constants.S3_METADATA_STORE_IMPL,
-          Constants.S3GUARD_METASTORE_NULL);
-      S3AUtils.setBucketOption(conf, host,
-          S3_METADATA_STORE_IMPL,
-          S3GUARD_METASTORE_NULL);
-    } else {
-      S3ATestUtils.maybeEnableS3Guard(conf);
-      conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta);
-      S3AUtils.setBucketOption(conf, host,
-          METADATASTORE_AUTHORITATIVE,
-          Boolean.toString(authoritativeMeta));
-      S3AUtils.setBucketOption(conf, host,
-          S3_METADATA_STORE_IMPL,
-          conf.get(S3_METADATA_STORE_IMPL));
+    String metastore;
+
+    metastore = S3GUARD_METASTORE_NULL;
+    if (!disableS3Guard) {
+      // pick up the metadata store used by the main test
+      metastore = getFileSystem().getConf().get(S3_METADATA_STORE_IMPL);
+      assertNotEquals(S3GUARD_METASTORE_NULL, metastore);
     }
-    FileSystem fs = FileSystem.get(fsURI, conf);
-    return asS3AFS(fs);
+
+    conf.set(Constants.S3_METADATA_STORE_IMPL, metastore);
+    conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta);
+    S3AUtils.setBucketOption(conf, host,
+        METADATASTORE_AUTHORITATIVE,
+        Boolean.toString(authoritativeMeta));
+    S3AUtils.setBucketOption(conf, host,
+        S3_METADATA_STORE_IMPL, metastore);
+
+    S3AFileSystem fs = asS3AFS(FileSystem.newInstance(fsURI, conf));
+    // do a check to verify that everything got through
+    assertEquals("Metadata store should have been disabled: " + fs,
+        disableS3Guard, !fs.hasMetadataStore());
+    assertEquals("metastore option did not propagate",
+        metastore, fs.getConf().get(S3_METADATA_STORE_IMPL));
+
+    return fs;
+
   }
 
   private static S3AFileSystem asS3AFS(FileSystem fs) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
index b746bfe5..dbf228d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.*;
 import java.net.URI;
 import java.util.ArrayList;
 
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.MultipartUploadListing;
 import com.amazonaws.services.s3.model.Region;
@@ -34,8 +35,9 @@ import com.amazonaws.services.s3.model.Region;
 public class MockS3ClientFactory implements S3ClientFactory {
 
   @Override
-  public AmazonS3 createS3Client(URI name) {
-    String bucket = name.getHost();
+  public AmazonS3 createS3Client(URI name,
+      final String bucket,
+      final AWSCredentialsProvider credentialSet) {
     AmazonS3 s3 = mock(AmazonS3.class);
     when(s3.doesBucketExist(bucket)).thenReturn(true);
     // this listing is used in startup if purging is enabled, so

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
index d731ae7..b28925c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
 
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.AccessDeniedException;
 import java.util.Arrays;
 import java.util.List;
 
@@ -34,11 +35,15 @@ import org.junit.rules.ExpectedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider;
+import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.test.GenericTestUtils;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.*;
 
 /**
@@ -221,14 +226,13 @@ public class TestS3AAWSCredentialsProvider {
   }
 
   private void expectProviderInstantiationFailure(String option,
-      String expectedErrorText) throws IOException {
+      String expectedErrorText) throws Exception {
     Configuration conf = new Configuration();
     conf.set(AWS_CREDENTIALS_PROVIDER, option);
     Path testFile = new Path(
         conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE));
-    expectException(IOException.class, expectedErrorText);
-    URI uri = testFile.toUri();
-    S3AUtils.createAWSCredentialProviderSet(uri, conf);
+    intercept(IOException.class, expectedErrorText,
+        () -> S3AUtils.createAWSCredentialProviderSet(testFile.toUri(), conf));
   }
 
   /**
@@ -288,4 +292,68 @@ public class TestS3AAWSCredentialsProvider {
         authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
   }
 
+  @Test
+  public void testExceptionLogic() throws Throwable {
+    AWSCredentialProviderList providers
+        = new AWSCredentialProviderList();
+    // verify you can't get credentials from it
+    NoAuthWithAWSException noAuth = intercept(NoAuthWithAWSException.class,
+        AWSCredentialProviderList.NO_AWS_CREDENTIAL_PROVIDERS,
+        () -> providers.getCredentials());
+    // but that it closes safely
+    providers.close();
+
+    S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration());
+    assertEquals("Expected no retry on auth failure",
+        RetryPolicy.RetryAction.FAIL.action,
+        retryPolicy.shouldRetry(noAuth, 0, 0, true).action);
+
+    try {
+      throw S3AUtils.translateException("login", "", noAuth);
+    } catch (AccessDeniedException expected) {
+      // this is what we want; other exceptions will be passed up
+      assertEquals("Expected no retry on AccessDeniedException",
+          RetryPolicy.RetryAction.FAIL.action,
+          retryPolicy.shouldRetry(expected, 0, 0, true).action);
+    }
+
+  }
+
+  @Test
+  public void testRefCounting() throws Throwable {
+    AWSCredentialProviderList providers
+        = new AWSCredentialProviderList();
+    assertEquals("Ref count for " + providers,
+        1, providers.getRefCount());
+    AWSCredentialProviderList replicate = providers.share();
+    assertEquals(providers, replicate);
+    assertEquals("Ref count after replication for " + providers,
+        2, providers.getRefCount());
+    assertFalse("Was closed " + providers, providers.isClosed());
+    providers.close();
+    assertFalse("Was closed " + providers, providers.isClosed());
+    assertEquals("Ref count after close() for " + providers,
+        1, providers.getRefCount());
+
+    // this should now close it
+    providers.close();
+    assertTrue("Was not closed " + providers, providers.isClosed());
+    assertEquals("Ref count after close() for " + providers,
+        0, providers.getRefCount());
+    assertEquals("Ref count after second close() for " + providers,
+        0, providers.getRefCount());
+    intercept(IllegalStateException.class, "closed",
+        () -> providers.share());
+    // final call harmless
+    providers.close();
+    assertEquals("Ref count after close() for " + providers,
+        0, providers.getRefCount());
+    providers.refresh();
+
+    intercept(NoAuthWithAWSException.class,
+        AWSCredentialProviderList.CREDENTIALS_REQUESTED_WHEN_CLOSED,
+        () -> providers.getCredentials());
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
index c6985b0..7451ef1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
@@ -61,6 +61,7 @@ import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
 import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
 import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
 import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.apache.hadoop.test.LambdaTestUtils.*;
 
 /**
@@ -85,6 +86,24 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
    */
   private S3AFileSystem roleFS;
 
+  /**
+   * Duration range exception text on SDKs which check client-side.
+   */
+  protected static final String E_DURATION_RANGE_1
+      = "Assume Role session duration should be in the range of 15min - 1Hr";
+
+  /**
+   * Duration range too high text on SDKs which check on the server.
+   */
+  protected static final String E_DURATION_RANGE_2
+      = "Member must have value less than or equal to 43200";
+
+  /**
+   * Duration range too low text on SDKs which check on the server.
+   */
+  protected static final String E_DURATION_RANGE_3
+      = "Member must have value greater than or equal to 900";
+
   @Override
   public void setup() throws Exception {
     super.setup();
@@ -112,13 +131,14 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
    * @param clazz class of exception to expect
    * @param text text in exception
    * @param <E> type of exception as inferred from clazz
+   * @return the caught exception if it was of the expected type and contents
    * @throws Exception if the exception was the wrong class
    */
-  private <E extends Throwable> void expectFileSystemCreateFailure(
+  private <E extends Throwable> E expectFileSystemCreateFailure(
       Configuration conf,
       Class<E> clazz,
       String text) throws Exception {
-    interceptClosing(clazz,
+    return interceptClosing(clazz,
         text,
         () -> new Path(getFileSystem().getUri()).getFileSystem(conf));
   }
@@ -246,6 +266,60 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
         "Member must satisfy regular expression pattern");
   }
 
+  /**
+   * A duration >1h is forbidden client-side in AWS SDK 1.11.271;
+   * with the ability to extend durations deployed in March 2018,
+   * duration checks will need to go server-side, and, presumably,
+   * later SDKs will remove the client side checks.
+   * This code exists to see when this happens.
+   */
+  @Test
+  public void testAssumeRoleThreeHourSessionDuration() throws Exception {
+    describe("Try to authenticate with a long session duration");
+
+    Configuration conf = createAssumedRoleConfig();
+    // add a duration of three hours
+    conf.setInt(ASSUMED_ROLE_SESSION_DURATION, 3 * 60 * 60);
+    try {
+      new Path(getFileSystem().getUri()).getFileSystem(conf).close();
+      LOG.info("Successfully created token of a duration >3h");
+    } catch (IOException ioe) {
+      assertExceptionContains(E_DURATION_RANGE_1, ioe);
+    }
+  }
+
+  /**
+   * A duration >1h is forbidden client-side in AWS SDK 1.11.271;
+   * with the ability to extend durations deployed in March 2018.
+   * with the later SDKs, the checks go server-side and
+   * later SDKs will remove the client side checks.
+   * This test asks for a duration which will still be rejected, and
+   * looks for either of the error messages raised.
+   */
+  @Test
+  public void testAssumeRoleThirtySixHourSessionDuration() throws Exception {
+    describe("Try to authenticate with a long session duration");
+
+    Configuration conf = createAssumedRoleConfig();
+    conf.setInt(ASSUMED_ROLE_SESSION_DURATION, 36 * 60 * 60);
+    IOException ioe = expectFileSystemCreateFailure(conf,
+        IOException.class, null);
+    assertIsRangeException(ioe);
+  }
+
+  /**
+   * Look for either the client-side or STS-side range exception
+   * @param e exception
+   * @throws Exception the exception, if its text doesn't match
+   */
+  private void assertIsRangeException(final Exception e) throws Exception {
+    String message = e.toString();
+    if (!message.contains(E_DURATION_RANGE_1)
+        && !message.contains(E_DURATION_RANGE_2)
+        && !message.contains(E_DURATION_RANGE_3)) {
+      throw e;
+    }
+  }
 
   /**
    * Create the assumed role configuration.
@@ -280,11 +354,11 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     describe("Expect the constructor to fail if the session is to short");
     Configuration conf = new Configuration();
     conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
-    interceptClosing(IllegalArgumentException.class, "",
+    Exception ex = interceptClosing(Exception.class, "",
         () -> new AssumedRoleCredentialProvider(uri, conf));
+    assertIsRangeException(ex);
   }
 
-
   @Test
   public void testAssumeRoleCreateFS() throws IOException {
     describe("Create an FS client with the role and do some basic IO");
@@ -296,24 +370,32 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
         conf.get(ACCESS_KEY), roleARN);
 
     try (FileSystem fs = path.getFileSystem(conf)) {
-      fs.getFileStatus(new Path("/"));
+      fs.getFileStatus(ROOT);
       fs.mkdirs(path("testAssumeRoleFS"));
     }
   }
 
   @Test
   public void testAssumeRoleRestrictedPolicyFS() throws Exception {
-    describe("Restrict the policy for this session; verify that reads fail");
+    describe("Restrict the policy for this session; verify that reads fail.");
 
+    // there's some special handling of S3Guard here as operations
+    // which only go to DDB don't fail the way S3 would reject them.
     Configuration conf = createAssumedRoleConfig();
     bindRolePolicy(conf, RESTRICTED_POLICY);
     Path path = new Path(getFileSystem().getUri());
+    boolean guarded = getFileSystem().hasMetadataStore();
     try (FileSystem fs = path.getFileSystem(conf)) {
-      forbidden("getFileStatus",
-          () -> fs.getFileStatus(new Path("/")));
-      forbidden("getFileStatus",
-          () -> fs.listStatus(new Path("/")));
-      forbidden("getFileStatus",
+      if (!guarded) {
+        // when S3Guard is enabled, the restricted policy still
+        // permits S3Guard record lookup, so getFileStatus calls
+        // will work iff the record is in the database.
+        forbidden("getFileStatus",
+            () -> fs.getFileStatus(ROOT));
+      }
+      forbidden("",
+          () -> fs.listStatus(ROOT));
+      forbidden("",
           () -> fs.mkdirs(path("testAssumeRoleFS")));
     }
   }
@@ -333,7 +415,11 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     Configuration conf = createAssumedRoleConfig();
 
     bindRolePolicy(conf,
-        policy(statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT_TORRENT)));
+        policy(
+            statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT_TORRENT),
+            ALLOW_S3_GET_BUCKET_LOCATION,
+            STATEMENT_S3GUARD_CLIENT,
+            STATEMENT_ALLOW_SSE_KMS_RW));
     Path path = path("testAssumeRoleStillIncludesRolePerms");
     roleFS = (S3AFileSystem) path.getFileSystem(conf);
     assertTouchForbidden(roleFS, path);
@@ -342,6 +428,8 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
   /**
    * After blocking all write verbs used by S3A, try to write data (fail)
    * and read data (succeed).
+   * For S3Guard: full DDB RW access is retained.
+   * SSE-KMS key access is set to decrypt only.
    */
   @Test
   public void testReadOnlyOperations() throws Throwable {
@@ -352,7 +440,9 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     bindRolePolicy(conf,
         policy(
             statement(false, S3_ALL_BUCKETS, S3_PATH_WRITE_OPERATIONS),
-            STATEMENT_ALL_S3, STATEMENT_ALL_DDB));
+            STATEMENT_ALL_S3,
+            STATEMENT_S3GUARD_CLIENT,
+            STATEMENT_ALLOW_SSE_KMS_READ));
     Path path = methodPath();
     roleFS = (S3AFileSystem) path.getFileSystem(conf);
     // list the root path, expect happy
@@ -399,8 +489,9 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     Configuration conf = createAssumedRoleConfig();
 
     bindRolePolicyStatements(conf,
-        STATEMENT_ALL_DDB,
+        STATEMENT_S3GUARD_CLIENT,
         statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
+        STATEMENT_ALLOW_SSE_KMS_RW,
         new Statement(Effects.Allow)
           .addActions(S3_ALL_OPERATIONS)
           .addResources(directory(restrictedDir)));
@@ -447,7 +538,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
   }
 
   /**
-   * Execute a sequence of rename operations.
+   * Execute a sequence of rename operations with access locked down.
    * @param conf FS configuration
    */
   public void executeRestrictedRename(final Configuration conf)
@@ -461,7 +552,8 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     fs.delete(basePath, true);
 
     bindRolePolicyStatements(conf,
-        STATEMENT_ALL_DDB,
+        STATEMENT_S3GUARD_CLIENT,
+        STATEMENT_ALLOW_SSE_KMS_RW,
         statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
         new Statement(Effects.Allow)
           .addActions(S3_PATH_RW_OPERATIONS)
@@ -503,6 +595,25 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
   }
 
   /**
+   * Without simulation of STS failures, and with STS overload likely to
+   * be very rare, there'll be no implicit test coverage of
+   * {@link AssumedRoleCredentialProvider#operationRetried(String, Exception, int, boolean)}.
+   * This test simply invokes the callback for both the first and second retry event.
+   *
+   * If the handler ever adds more than logging, this test ensures that things
+   * don't break.
+   */
+  @Test
+  public void testAssumedRoleRetryHandler() throws Throwable {
+    try(AssumedRoleCredentialProvider provider
+            = new AssumedRoleCredentialProvider(getFileSystem().getUri(),
+        createAssumedRoleConfig())) {
+      provider.operationRetried("retry", new IOException("failure"), 0, true);
+      provider.operationRetried("retry", new IOException("failure"), 1, true);
+    }
+  }
+
+  /**
    * Execute a sequence of rename operations where the source
    * data is read only to the client calling rename().
    * This will cause the inner delete() operations to fail, whose outcomes
@@ -534,7 +645,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     touch(fs, readOnlyFile);
 
     bindRolePolicyStatements(conf,
-        STATEMENT_ALL_DDB,
+        STATEMENT_S3GUARD_CLIENT,
         statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
           new Statement(Effects.Allow)
             .addActions(S3_PATH_RW_OPERATIONS)
@@ -614,7 +725,8 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     fs.mkdirs(readOnlyDir);
 
     bindRolePolicyStatements(conf,
-        STATEMENT_ALL_DDB,
+        STATEMENT_S3GUARD_CLIENT,
+        STATEMENT_ALLOW_SSE_KMS_RW,
         statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
         new Statement(Effects.Allow)
             .addActions(S3_PATH_RW_OPERATIONS)
@@ -752,7 +864,8 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
     fs.delete(destDir, true);
 
     bindRolePolicyStatements(conf,
-        STATEMENT_ALL_DDB,
+        STATEMENT_S3GUARD_CLIENT,
+        STATEMENT_ALLOW_SSE_KMS_RW,
         statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
         new Statement(Effects.Deny)
             .addActions(S3_PATH_WRITE_OPERATIONS)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
index bb66268..834826e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
@@ -72,7 +72,8 @@ public class ITestAssumedRoleCommitOperations extends ITestCommitOperations {
     Configuration conf = newAssumedRoleConfig(getConfiguration(),
         getAssumedRoleARN());
     bindRolePolicyStatements(conf,
-        STATEMENT_ALL_DDB,
+        STATEMENT_S3GUARD_CLIENT,
+        STATEMENT_ALLOW_SSE_KMS_RW,
         statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
         new RoleModel.Statement(RoleModel.Effects.Allow)
             .addActions(S3_PATH_RW_OPERATIONS)
@@ -81,7 +82,6 @@ public class ITestAssumedRoleCommitOperations extends ITestCommitOperations {
     roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf);
   }
 
-
   @Override
   public void teardown() throws Exception {
     S3AUtils.closeAll(LOG, roleFS);
@@ -122,7 +122,6 @@ public class ITestAssumedRoleCommitOperations extends ITestCommitOperations {
     return new Path(restrictedDir, filepath);
   }
 
-
   private String getAssumedRoleARN() {
     return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
index 9fa2600..854e7ec 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
@@ -58,14 +58,23 @@ public final class RoleTestUtils {
 
 
   /** Deny GET requests to all buckets. */
-  public static final Statement DENY_GET_ALL =
+  public static final Statement DENY_S3_GET_OBJECT =
       statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT);
 
+  public static final Statement ALLOW_S3_GET_BUCKET_LOCATION
+      =  statement(true, S3_ALL_BUCKETS, S3_GET_BUCKET_LOCATION);
+
   /**
-   * This is AWS policy removes read access.
+   * This is AWS policy removes read access from S3, leaves S3Guard access up.
+   * This will allow clients to use S3Guard list/HEAD operations, even
+   * the ability to write records, but not actually access the underlying
+   * data.
+   * The client does need {@link RolePolicies#S3_GET_BUCKET_LOCATION} to
+   * get the bucket location.
    */
-  public static final Policy RESTRICTED_POLICY = policy(DENY_GET_ALL);
-
+  public static final Policy RESTRICTED_POLICY = policy(
+      DENY_S3_GET_OBJECT, STATEMENT_ALL_DDB, ALLOW_S3_GET_BUCKET_LOCATION
+      );
 
   /**
    * Error message to get from the AWS SDK if you can't assume the role.
@@ -145,7 +154,7 @@ public final class RoleTestUtils {
     Configuration conf = new Configuration(srcConf);
     conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
     conf.set(ASSUMED_ROLE_ARN, roleARN);
-    conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
+    conf.set(ASSUMED_ROLE_SESSION_NAME, "test");
     conf.set(ASSUMED_ROLE_SESSION_DURATION, "15m");
     disableFilesystemCaching(conf);
     return conf;
@@ -163,9 +172,8 @@ public final class RoleTestUtils {
       String contained,
       Callable<T> eval)
       throws Exception {
-    AccessDeniedException ex = intercept(AccessDeniedException.class, eval);
-    GenericTestUtils.assertExceptionContains(contained, ex);
-    return ex;
+    return intercept(AccessDeniedException.class,
+        contained, eval);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
index f591e32..9185fc5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.util.StopWatch;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,6 +52,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_NAME_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_METASTORE_NULL;
 import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
@@ -144,8 +146,11 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
 
     // Also create a "raw" fs without any MetadataStore configured
     Configuration conf = new Configuration(getConfiguration());
-    conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL);
     URI fsUri = getFileSystem().getUri();
+    conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL);
+    S3AUtils.setBucketOption(conf,fsUri.getHost(),
+        METADATASTORE_AUTHORITATIVE,
+        S3GUARD_METASTORE_NULL);
     rawFs = (S3AFileSystem) FileSystem.newInstance(fsUri, conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
index c6838a0..22a1efd 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
@@ -40,8 +40,10 @@ import org.junit.rules.Timeout;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
 
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
 
@@ -80,81 +82,102 @@ public class ITestS3GuardConcurrentOps extends AbstractS3ATestBase {
 
   @Test
   public void testConcurrentTableCreations() throws Exception {
-    final Configuration conf = getConfiguration();
+    S3AFileSystem fs = getFileSystem();
+    final Configuration conf = fs.getConf();
     Assume.assumeTrue("Test only applies when DynamoDB is used for S3Guard",
         conf.get(Constants.S3_METADATA_STORE_IMPL).equals(
             Constants.S3GUARD_METASTORE_DYNAMO));
 
-    DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
-    ms.initialize(getFileSystem());
-    DynamoDB db = ms.getDynamoDB();
-
-    String tableName = "testConcurrentTableCreations" + new Random().nextInt();
-    conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
-    conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    AWSCredentialProviderList sharedCreds =
+        fs.shareCredentials("testConcurrentTableCreations");
+    // close that shared copy.
+    sharedCreds.close();
+    // this is the original reference count.
+    int originalRefCount = sharedCreds.getRefCount();
 
-    String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
-    if (StringUtils.isEmpty(region)) {
-      // no region set, so pick it up from the test bucket
-      conf.set(S3GUARD_DDB_REGION_KEY, getFileSystem().getBucketLocation());
-    }
-    int concurrentOps = 16;
-    int iterations = 4;
+    //now init the store; this should increment the ref count.
+    DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+    ms.initialize(fs);
 
-    failIfTableExists(db, tableName);
+    // the ref count should have gone up
+    assertEquals("Credential Ref count unchanged after initializing metastore "
+        + sharedCreds,
+        originalRefCount + 1, sharedCreds.getRefCount());
+    try {
+      DynamoDB db = ms.getDynamoDB();
 
-    for (int i = 0; i < iterations; i++) {
-      ExecutorService executor = Executors.newFixedThreadPool(
-          concurrentOps, new ThreadFactory() {
-            private AtomicInteger count = new AtomicInteger(0);
+      String tableName = "testConcurrentTableCreations" + new Random().nextInt();
+      conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
+      conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
 
-            public Thread newThread(Runnable r) {
-              return new Thread(r,
-                  "testConcurrentTableCreations" + count.getAndIncrement());
+      String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+      if (StringUtils.isEmpty(region)) {
+        // no region set, so pick it up from the test bucket
+        conf.set(S3GUARD_DDB_REGION_KEY, fs.getBucketLocation());
+      }
+      int concurrentOps = 16;
+      int iterations = 4;
+
+      failIfTableExists(db, tableName);
+
+      for (int i = 0; i < iterations; i++) {
+        ExecutorService executor = Executors.newFixedThreadPool(
+            concurrentOps, new ThreadFactory() {
+              private AtomicInteger count = new AtomicInteger(0);
+
+              public Thread newThread(Runnable r) {
+                return new Thread(r,
+                    "testConcurrentTableCreations" + count.getAndIncrement());
+              }
+            });
+        ((ThreadPoolExecutor) executor).prestartAllCoreThreads();
+        Future<Exception>[] futures = new Future[concurrentOps];
+        for (int f = 0; f < concurrentOps; f++) {
+          final int index = f;
+          futures[f] = executor.submit(new Callable<Exception>() {
+            @Override
+            public Exception call() throws Exception {
+
+              ContractTestUtils.NanoTimer timer =
+                  new ContractTestUtils.NanoTimer();
+
+              Exception result = null;
+              try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) {
+                store.initialize(conf);
+              } catch (Exception e) {
+                LOG.error(e.getClass() + ": " + e.getMessage());
+                result = e;
+              }
+
+              timer.end("Parallel DynamoDB client creation %d", index);
+              LOG.info("Parallel DynamoDB client creation {} ran from {} to {}",
+                  index, timer.getStartTime(), timer.getEndTime());
+              return result;
             }
           });
-      ((ThreadPoolExecutor) executor).prestartAllCoreThreads();
-      Future<Exception>[] futures = new Future[concurrentOps];
-      for (int f = 0; f < concurrentOps; f++) {
-        final int index = f;
-        futures[f] = executor.submit(new Callable<Exception>() {
-          @Override
-          public Exception call() throws Exception {
-
-            ContractTestUtils.NanoTimer timer =
-                new ContractTestUtils.NanoTimer();
-
-            Exception result = null;
-            try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) {
-              store.initialize(conf);
-            } catch (Exception e) {
-              LOG.error(e.getClass() + ": " + e.getMessage());
-              result = e;
-            }
-
-            timer.end("Parallel DynamoDB client creation %d", index);
-            LOG.info("Parallel DynamoDB client creation {} ran from {} to {}",
-                index, timer.getStartTime(), timer.getEndTime());
-            return result;
+        }
+        List<Exception> exceptions = new ArrayList<>(concurrentOps);
+        for (int f = 0; f < concurrentOps; f++) {
+          Exception outcome = futures[f].get();
+          if (outcome != null) {
+            exceptions.add(outcome);
           }
-        });
-      }
-      List<Exception> exceptions = new ArrayList<>(concurrentOps);
-      for (int f = 0; f < concurrentOps; f++) {
-        Exception outcome = futures[f].get();
-        if (outcome != null) {
-          exceptions.add(outcome);
+        }
+        deleteTable(db, tableName);
+        int exceptionsThrown = exceptions.size();
+        if (exceptionsThrown > 0) {
+          // at least one exception was thrown. Fail the test & nest the first
+          // exception caught
+          throw new AssertionError(exceptionsThrown + "/" + concurrentOps +
+              " threads threw exceptions while initializing on iteration " + i,
+              exceptions.get(0));
         }
       }
-      deleteTable(db, tableName);
-      int exceptionsThrown = exceptions.size();
-      if (exceptionsThrown > 0) {
-        // at least one exception was thrown. Fail the test & nest the first
-        // exception caught
-        throw new AssertionError(exceptionsThrown + "/" + concurrentOps +
-            " threads threw exceptions while initializing on iteration " + i,
-            exceptions.get(0));
-      }
+    } finally {
+      ms.close();
     }
+    assertEquals("Credential Ref count unchanged after closing metastore: "
+            + sharedCreds,
+        originalRefCount, sharedCreds.getRefCount());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HADOOP-15583. Stabilize S3A Assumed Role support. Contributed by Steve Loughran.

Posted by st...@apache.org.
HADOOP-15583. Stabilize S3A Assumed Role support.
Contributed by Steve Loughran.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/da9a39ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/da9a39ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/da9a39ee

Branch: refs/heads/trunk
Commit: da9a39eed138210de29b59b90c449b28da1c04f9
Parents: d81cd36
Author: Steve Loughran <st...@apache.org>
Authored: Wed Aug 8 22:57:10 2018 -0700
Committer: Steve Loughran <st...@apache.org>
Committed: Wed Aug 8 22:57:24 2018 -0700

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |  18 +-
 .../fs/s3a/AWSCredentialProviderList.java       | 101 ++++++--
 .../org/apache/hadoop/fs/s3a/Constants.java     |  19 +-
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java   | 190 ++++----------
 .../fs/s3a/InconsistentAmazonS3Client.java      |  10 +
 .../fs/s3a/InconsistentS3ClientFactory.java     |  11 +
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  35 ++-
 .../apache/hadoop/fs/s3a/S3ARetryPolicy.java    |   4 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 245 +++++++++++++++++--
 .../apache/hadoop/fs/s3a/S3ClientFactory.java   |   7 +-
 .../s3a/auth/AssumedRoleCredentialProvider.java |  78 +++++-
 .../fs/s3a/auth/NoAuthWithAWSException.java     |  37 +++
 .../apache/hadoop/fs/s3a/auth/RoleModel.java    |   8 +
 .../apache/hadoop/fs/s3a/auth/RolePolicies.java | 143 +++++++++--
 .../hadoop/fs/s3a/auth/STSClientFactory.java    |  78 ++++++
 .../fs/s3a/s3guard/DynamoDBClientFactory.java   |  18 +-
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java   |  62 ++++-
 .../markdown/tools/hadoop-aws/assumed_roles.md  | 191 +++++++++++----
 .../src/site/markdown/tools/hadoop-aws/index.md |   6 +-
 .../hadoop/fs/s3a/ITestS3AConfiguration.java    | 117 ++++-----
 .../fs/s3a/ITestS3ATemporaryCredentials.java    |  71 +++---
 .../fs/s3a/ITestS3GuardListConsistency.java     |  68 +++--
 .../hadoop/fs/s3a/ITestS3GuardWriteBack.java    |  57 +++--
 .../hadoop/fs/s3a/MockS3ClientFactory.java      |   6 +-
 .../fs/s3a/TestS3AAWSCredentialsProvider.java   |  76 +++++-
 .../hadoop/fs/s3a/auth/ITestAssumeRole.java     | 151 ++++++++++--
 .../auth/ITestAssumedRoleCommitOperations.java  |   5 +-
 .../hadoop/fs/s3a/auth/RoleTestUtils.java       |  24 +-
 .../s3guard/AbstractS3GuardToolTestBase.java    |   7 +-
 .../s3a/s3guard/ITestS3GuardConcurrentOps.java  | 147 ++++++-----
 30 files changed, 1461 insertions(+), 529 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 75acf48..29c2bc2 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1033,7 +1033,19 @@
   <name>fs.s3a.assumed.role.sts.endpoint</name>
   <value/>
   <description>
-    AWS Simple Token Service Endpoint. If unset, uses the default endpoint.
+    AWS Security Token Service Endpoint.
+    If unset, uses the default endpoint.
+    Only used if AssumedRoleCredentialProvider is the AWS credential provider.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.assumed.role.sts.endpoint.region</name>
+  <value>us-west-1</value>
+  <description>
+    AWS Security Token Service Endpoint's region;
+    Needed if fs.s3a.assumed.role.sts.endpoint points to an endpoint
+    other than the default one and the v4 signature is used.
     Only used if AssumedRoleCredentialProvider is the AWS credential provider.
   </description>
 </property>
@@ -1058,7 +1070,9 @@
 <property>
   <name>fs.s3a.connection.ssl.enabled</name>
   <value>true</value>
-  <description>Enables or disables SSL connections to S3.</description>
+  <description>Enables or disables SSL connections to AWS services.
+    Also sets the default port to use for the s3a proxy settings,
+    when not explicitly set in fs.s3a.proxy.port.</description>
 </property>
 
 <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java
index 10201f0..f9052fa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java
@@ -18,25 +18,29 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AnonymousAWSCredentials;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
 import org.apache.hadoop.io.IOUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
 /**
  * A list of providers.
  *
@@ -62,10 +66,18 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
   public static final String NO_AWS_CREDENTIAL_PROVIDERS
       = "No AWS Credential Providers";
 
+  static final String
+      CREDENTIALS_REQUESTED_WHEN_CLOSED
+      = "Credentials requested after provider list was closed";
+
   private final List<AWSCredentialsProvider> providers = new ArrayList<>(1);
   private boolean reuseLastProvider = true;
   private AWSCredentialsProvider lastProvider;
 
+  private final AtomicInteger refCount = new AtomicInteger(1);
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
   /**
    * Empty instance. This is not ready to be used.
    */
@@ -94,6 +106,9 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
    */
   @Override
   public void refresh() {
+    if (isClosed()) {
+      return;
+    }
     for (AWSCredentialsProvider provider : providers) {
       provider.refresh();
     }
@@ -106,6 +121,11 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
    */
   @Override
   public AWSCredentials getCredentials() {
+    if (isClosed()) {
+      LOG.warn(CREDENTIALS_REQUESTED_WHEN_CLOSED);
+      throw new NoAuthWithAWSException(
+          CREDENTIALS_REQUESTED_WHEN_CLOSED);
+    }
     checkNotEmpty();
     if (reuseLastProvider && lastProvider != null) {
       return lastProvider.getCredentials();
@@ -136,8 +156,7 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
     if (lastException != null) {
       message += ": " + lastException;
     }
-    throw new AmazonClientException(message, lastException);
-
+    throw new NoAuthWithAWSException(message, lastException);
   }
 
   /**
@@ -156,7 +175,7 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
    */
   public void checkNotEmpty() {
     if (providers.isEmpty()) {
-      throw new AmazonClientException(NO_AWS_CREDENTIAL_PROVIDERS);
+      throw new NoAuthWithAWSException(NO_AWS_CREDENTIAL_PROVIDERS);
     }
   }
 
@@ -178,8 +197,38 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
    */
   @Override
   public String toString() {
-    return "AWSCredentialProviderList: " +
-        StringUtils.join(providers, " ");
+    return "AWSCredentialProviderList[" +
+        "refcount= " + refCount.get() + ": [" +
+        StringUtils.join(providers, ", ") + ']';
+  }
+
+  /**
+   * Get a reference to this object with an updated reference count.
+   *
+   * @return a reference to this
+   */
+  public synchronized AWSCredentialProviderList share() {
+    Preconditions.checkState(!closed.get(), "Provider list is closed");
+    refCount.incrementAndGet();
+    return this;
+  }
+
+  /**
+   * Get the current reference count.
+   * @return the current ref count
+   */
+  @VisibleForTesting
+  public int getRefCount() {
+    return refCount.get();
+  }
+
+  /**
+   * Get the closed flag.
+   * @return true iff the list is closed.
+   */
+  @VisibleForTesting
+  public boolean isClosed() {
+    return closed.get();
   }
 
   /**
@@ -190,9 +239,29 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
    */
   @Override
   public void close() {
-    for(AWSCredentialsProvider p: providers) {
+    synchronized (this) {
+      if (closed.get()) {
+        // already closed: no-op
+        return;
+      }
+      int remainder = refCount.decrementAndGet();
+      if (remainder != 0) {
+        // still actively used, or somehow things are
+        // now negative
+        LOG.debug("Not closing {}", this);
+        return;
+      }
+      // at this point, the closing is going to happen
+      LOG.debug("Closing {}", this);
+      closed.set(true);
+    }
+
+    // do this outside the synchronized block.
+    for (AWSCredentialsProvider p : providers) {
       if (p instanceof Closeable) {
-        IOUtils.closeStream((Closeable)p);
+        IOUtils.closeStream((Closeable) p);
+      } else if (p instanceof AutoCloseable) {
+        S3AUtils.closeAutocloseables(LOG, (AutoCloseable)p);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index c521936..a8da6ec 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -84,10 +84,27 @@ public final class Constants {
   public static final String ASSUMED_ROLE_SESSION_DURATION =
       "fs.s3a.assumed.role.session.duration";
 
-  /** Simple Token Service Endpoint. If unset, uses the default endpoint. */
+  /** Security Token Service Endpoint. If unset, uses the default endpoint. */
   public static final String ASSUMED_ROLE_STS_ENDPOINT =
       "fs.s3a.assumed.role.sts.endpoint";
 
+  /**
+   * Region for the STS endpoint; only relevant if the endpoint
+   * is set.
+   */
+  public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION =
+      "fs.s3a.assumed.role.sts.endpoint.region";
+
+  /**
+   * Default value for the STS endpoint region; needed for
+   * v4 signing.
+   */
+  public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT =
+      "us-west-1";
+
+  /**
+   * Default duration of an assumed role.
+   */
   public static final String ASSUMED_ROLE_SESSION_DURATION_DEFAULT = "30m";
 
   /** list of providers to authenticate for the assumed role. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index f33b25e..ade317f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -18,59 +18,45 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import java.io.IOException;
+import java.net.URI;
+
 import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.VersionInfo;
 import org.slf4j.Logger;
 
-import java.io.IOException;
-import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 
-import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
-import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
+import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
 
 /**
- * The default factory implementation, which calls the AWS SDK to configure
- * and create an {@link AmazonS3Client} that communicates with the S3 service.
+ * The default {@link S3ClientFactory} implementation.
+ * This which calls the AWS SDK to configure and create an
+ * {@link AmazonS3Client} that communicates with the S3 service.
  */
-public class DefaultS3ClientFactory extends Configured implements
-    S3ClientFactory {
+public class DefaultS3ClientFactory extends Configured
+    implements S3ClientFactory {
 
   protected static final Logger LOG = S3AFileSystem.LOG;
 
   @Override
-  public AmazonS3 createS3Client(URI name) throws IOException {
+  public AmazonS3 createS3Client(URI name,
+      final String bucket,
+      final AWSCredentialsProvider credentials) throws IOException {
     Configuration conf = getConf();
-    AWSCredentialsProvider credentials =
-        createAWSCredentialProviderSet(name, conf);
-    final ClientConfiguration awsConf = createAwsConf(getConf());
-    AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
-    return createAmazonS3Client(s3, conf, credentials, awsConf);
+    final ClientConfiguration awsConf = S3AUtils.createAwsConf(getConf(), bucket);
+    return configureAmazonS3Client(
+        newAmazonS3Client(credentials, awsConf), conf);
   }
 
   /**
-   * Create a new {@link ClientConfiguration}.
-   * @param conf The Hadoop configuration
-   * @return new AWS client configuration
-   */
-  public static ClientConfiguration createAwsConf(Configuration conf) {
-    final ClientConfiguration awsConf = new ClientConfiguration();
-    initConnectionSettings(conf, awsConf);
-    initProxySupport(conf, awsConf);
-    initUserAgent(conf, awsConf);
-    return awsConf;
-  }
-
-  /**
-   * Wrapper around constructor for {@link AmazonS3} client.  Override this to
-   * provide an extended version of the client
+   * Wrapper around constructor for {@link AmazonS3} client.
+   * Override this to provide an extended version of the client
    * @param credentials credentials to use
    * @param awsConf  AWS configuration
    * @return  new AmazonS3 client
@@ -81,120 +67,17 @@ public class DefaultS3ClientFactory extends Configured implements
   }
 
   /**
-   * Initializes all AWS SDK settings related to connection management.
-   *
-   * @param conf Hadoop configuration
-   * @param awsConf AWS SDK configuration
-   */
-  private static void initConnectionSettings(Configuration conf,
-      ClientConfiguration awsConf) {
-    awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
-        DEFAULT_MAXIMUM_CONNECTIONS, 1));
-    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
-        DEFAULT_SECURE_CONNECTIONS);
-    awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
-    awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
-        DEFAULT_MAX_ERROR_RETRIES, 0));
-    awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
-        DEFAULT_ESTABLISH_TIMEOUT, 0));
-    awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
-        DEFAULT_SOCKET_TIMEOUT, 0));
-    int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
-        DEFAULT_SOCKET_SEND_BUFFER, 2048);
-    int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
-        DEFAULT_SOCKET_RECV_BUFFER, 2048);
-    awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
-    String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
-    if (!signerOverride.isEmpty()) {
-      LOG.debug("Signer override = {}", signerOverride);
-      awsConf.setSignerOverride(signerOverride);
-    }
-  }
-
-  /**
-   * Initializes AWS SDK proxy support if configured.
-   *
-   * @param conf Hadoop configuration
-   * @param awsConf AWS SDK configuration
-   * @throws IllegalArgumentException if misconfigured
-   */
-  private static void initProxySupport(Configuration conf,
-      ClientConfiguration awsConf) throws IllegalArgumentException {
-    String proxyHost = conf.getTrimmed(PROXY_HOST, "");
-    int proxyPort = conf.getInt(PROXY_PORT, -1);
-    if (!proxyHost.isEmpty()) {
-      awsConf.setProxyHost(proxyHost);
-      if (proxyPort >= 0) {
-        awsConf.setProxyPort(proxyPort);
-      } else {
-        if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
-          LOG.warn("Proxy host set without port. Using HTTPS default 443");
-          awsConf.setProxyPort(443);
-        } else {
-          LOG.warn("Proxy host set without port. Using HTTP default 80");
-          awsConf.setProxyPort(80);
-        }
-      }
-      String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
-      String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
-      if ((proxyUsername == null) != (proxyPassword == null)) {
-        String msg = "Proxy error: " + PROXY_USERNAME + " or " +
-            PROXY_PASSWORD + " set without the other.";
-        LOG.error(msg);
-        throw new IllegalArgumentException(msg);
-      }
-      awsConf.setProxyUsername(proxyUsername);
-      awsConf.setProxyPassword(proxyPassword);
-      awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
-      awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
-                "domain {} as workstation {}", awsConf.getProxyHost(),
-            awsConf.getProxyPort(),
-            String.valueOf(awsConf.getProxyUsername()),
-            awsConf.getProxyPassword(), awsConf.getProxyDomain(),
-            awsConf.getProxyWorkstation());
-      }
-    } else if (proxyPort >= 0) {
-      String msg =
-          "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
-      LOG.error(msg);
-      throw new IllegalArgumentException(msg);
-    }
-  }
-
-  /**
-   * Initializes the User-Agent header to send in HTTP requests to the S3
-   * back-end.  We always include the Hadoop version number.  The user also
-   * may set an optional custom prefix to put in front of the Hadoop version
-   * number.  The AWS SDK interally appends its own information, which seems
-   * to include the AWS SDK version, OS and JVM version.
+   * Configure S3 client from the Hadoop configuration.
    *
-   * @param conf Hadoop configuration
-   * @param awsConf AWS SDK configuration
-   */
-  private static void initUserAgent(Configuration conf,
-      ClientConfiguration awsConf) {
-    String userAgent = "Hadoop " + VersionInfo.getVersion();
-    String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
-    if (!userAgentPrefix.isEmpty()) {
-      userAgent = userAgentPrefix + ", " + userAgent;
-    }
-    LOG.debug("Using User-Agent: {}", userAgent);
-    awsConf.setUserAgentPrefix(userAgent);
-  }
-
-  /**
-   * Creates an {@link AmazonS3Client} from the established configuration.
+   * This includes: endpoint, Path Access and possibly other
+   * options.
    *
    * @param conf Hadoop configuration
-   * @param credentials AWS credentials
-   * @param awsConf AWS SDK configuration
    * @return S3 client
    * @throws IllegalArgumentException if misconfigured
    */
-  private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf,
-      AWSCredentialsProvider credentials, ClientConfiguration awsConf)
+  private static AmazonS3 configureAmazonS3Client(AmazonS3 s3,
+      Configuration conf)
       throws IllegalArgumentException {
     String endPoint = conf.getTrimmed(ENDPOINT, "");
     if (!endPoint.isEmpty()) {
@@ -206,21 +89,29 @@ public class DefaultS3ClientFactory extends Configured implements
         throw new IllegalArgumentException(msg, e);
       }
     }
-    enablePathStyleAccessIfRequired(s3, conf);
-    return s3;
+    return applyS3ClientOptions(s3, conf);
   }
 
   /**
-   * Enables path-style access to S3 buckets if configured.  By default, the
+   * Perform any tuning of the {@code S3ClientOptions} settings based on
+   * the Hadoop configuration.
+   * This is different from the general AWS configuration creation as
+   * it is unique to S3 connections.
+   *
+   * The {@link Constants#PATH_STYLE_ACCESS} option enables path-style access
+   * to S3 buckets if configured.  By default, the
    * behavior is to use virtual hosted-style access with URIs of the form
-   * http://bucketname.s3.amazonaws.com.  Enabling path-style access and a
+   * {@code http://bucketname.s3.amazonaws.com}
+   * Enabling path-style access and a
    * region-specific endpoint switches the behavior to use URIs of the form
-   * http://s3-eu-west-1.amazonaws.com/bucketname.
-   *
+   * {@code http://s3-eu-west-1.amazonaws.com/bucketname}.
+   * It is common to use this when connecting to private S3 servers, as it
+   * avoids the need to play with DNS entries.
    * @param s3 S3 client
    * @param conf Hadoop configuration
+   * @return the S3 client
    */
-  private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
+  private static AmazonS3 applyS3ClientOptions(AmazonS3 s3,
       Configuration conf) {
     final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
     if (pathStyleAccess) {
@@ -229,5 +120,6 @@ public class DefaultS3ClientFactory extends Configured implements
           .setPathStyleAccess(true)
           .build());
     }
+    return s3;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index 99ed87d..2cd1aae 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -114,6 +114,16 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   /** Map of key to delay -> time it was created. */
   private Map<String, Long> delayedPutKeys = new HashMap<>();
 
+  /**
+   * Instantiate.
+   * This subclasses a deprecated constructor of the parent
+   * {@code AmazonS3Client} class; we can't use the builder API because,
+   * that only creates the consistent client.
+   * @param credentials credentials to auth.
+   * @param clientConfiguration connection settings
+   * @param conf hadoop configuration.
+   */
+  @SuppressWarnings("deprecation")
   public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
       ClientConfiguration clientConfiguration, Configuration conf) {
     super(credentials, clientConfiguration);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
index 17d268b..932c472 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
@@ -21,16 +21,27 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * S3 Client factory used for testing with eventual consistency fault injection.
+ * This client is for testing <i>only</i>; it is in the production
+ * {@code hadoop-aws} module to enable integration tests to use this
+ * just by editing the Hadoop configuration used to bring up the client.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {
 
+  /**
+   * Create the inconsistent client.
+   * Logs a warning that this is being done.
+   * @param credentials credentials to use
+   * @param awsConf  AWS configuration
+   * @return an inconsistent client.
+   */
   @Override
   protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials,
       ClientConfiguration awsConf) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 737d7da..72a5fde 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -77,8 +77,9 @@ import com.amazonaws.event.ProgressListener;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -124,9 +125,6 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * The core S3A Filesystem implementation.
  *
@@ -205,6 +203,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   private boolean useListV1;
   private MagicCommitIntegration committerIntegration;
 
+  private AWSCredentialProviderList credentials;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -252,8 +252,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
           S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
           S3ClientFactory.class);
+
+      credentials = createAWSCredentialProviderSet(name, conf);
       s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
-          .createS3Client(name);
+          .createS3Client(name, bucket, credentials);
       invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
       s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()),
           onRetry);
@@ -2470,12 +2472,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
         transfers.shutdownNow(true);
         transfers = null;
       }
-      if (metadataStore != null) {
-        metadataStore.close();
-        metadataStore = null;
-      }
-      IOUtils.closeQuietly(instrumentation);
+      S3AUtils.closeAll(LOG, metadataStore, instrumentation);
+      metadataStore = null;
       instrumentation = null;
+      closeAutocloseables(LOG, credentials);
+      credentials = null;
     }
   }
 
@@ -2885,6 +2886,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
     }
     sb.append(", boundedExecutor=").append(boundedThreadPool);
     sb.append(", unboundedExecutor=").append(unboundedThreadPool);
+    sb.append(", credentials=").append(credentials);
     sb.append(", statistics {")
         .append(statistics)
         .append("}");
@@ -3319,4 +3321,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       return false;
     }
   }
+
+  /**
+   * Get a shared copy of the AWS credentials, with its reference
+   * counter updated.
+   * Caller is required to call {@code close()} on this after
+   * they have finished using it.
+   * @param purpose what is this for? This is initially for logging
+   * @return a reference to shared credentials.
+   */
+  public AWSCredentialProviderList shareCredentials(final String purpose) {
+    LOG.debug("Sharing credentials for: {}", purpose);
+    return credentials.share();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index 2b361fd..e6e7895 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -37,6 +37,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.ConnectTimeoutException;
@@ -154,8 +155,9 @@ public class S3ARetryPolicy implements RetryPolicy {
     policyMap.put(InterruptedException.class, fail);
     // note this does not pick up subclasses (like socket timeout)
     policyMap.put(InterruptedIOException.class, fail);
-    // interesting question: should this be retried ever?
+    // Access denial and auth exceptions are not retried
     policyMap.put(AccessDeniedException.class, fail);
+    policyMap.put(NoAuthWithAWSException.class, fail);
     policyMap.put(FileNotFoundException.class, fail);
     policyMap.put(InvalidRequestException.class, fail);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index a5f7d75..9908fd1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.AbortedException;
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
 import com.amazonaws.SdkBaseException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
@@ -44,15 +46,18 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.util.VersionInfo;
 
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -174,11 +179,17 @@ public final class S3AUtils {
         // call considered an sign of connectivity failure
         return (EOFException)new EOFException(message).initCause(exception);
       }
+      if (exception instanceof NoAuthWithAWSException) {
+        // the exception raised by AWSCredentialProvider list if the
+        // credentials were not accepted.
+        return (AccessDeniedException)new AccessDeniedException(path, null,
+            exception.toString()).initCause(exception);
+      }
       return new AWSClientIOException(message, exception);
     } else {
       if (exception instanceof AmazonDynamoDBException) {
         // special handling for dynamo DB exceptions
-        return translateDynamoDBException(message,
+        return translateDynamoDBException(path, message,
             (AmazonDynamoDBException)exception);
       }
       IOException ioe;
@@ -373,20 +384,45 @@ public final class S3AUtils {
 
   /**
    * Translate a DynamoDB exception into an IOException.
+   *
+   * @param path path in the DDB
    * @param message preformatted message for the exception
-   * @param ex exception
+   * @param ddbException exception
    * @return an exception to throw.
    */
-  public static IOException translateDynamoDBException(String message,
-      AmazonDynamoDBException ex) {
-    if (isThrottleException(ex)) {
-      return new AWSServiceThrottledException(message, ex);
+  public static IOException translateDynamoDBException(final String path,
+      final String message,
+      final AmazonDynamoDBException ddbException) {
+    if (isThrottleException(ddbException)) {
+      return new AWSServiceThrottledException(message, ddbException);
     }
-    if (ex instanceof ResourceNotFoundException) {
+    if (ddbException instanceof ResourceNotFoundException) {
       return (FileNotFoundException) new FileNotFoundException(message)
-          .initCause(ex);
+          .initCause(ddbException);
+    }
+    final int statusCode = ddbException.getStatusCode();
+    final String errorCode = ddbException.getErrorCode();
+    IOException result = null;
+    // 400 gets used a lot by DDB
+    if (statusCode == 400) {
+      switch (errorCode) {
+      case "AccessDeniedException":
+        result = (IOException) new AccessDeniedException(
+            path,
+            null,
+            ddbException.toString())
+            .initCause(ddbException);
+        break;
+
+      default:
+        result = new AWSBadRequestException(message, ddbException);
+      }
+
     }
-    return new AWSServiceIOException(message, ex);
+    if (result ==  null) {
+      result = new AWSServiceIOException(message, ddbException);
+    }
+    return result;
   }
 
   /**
@@ -738,6 +774,29 @@ public final class S3AUtils {
       String baseKey,
       String overrideVal)
       throws IOException {
+    return lookupPassword(bucket, conf, baseKey, overrideVal, "");
+  }
+
+  /**
+   * Get a password from a configuration, including JCEKS files, handling both
+   * the absolute key and bucket override.
+   * @param bucket bucket or "" if none known
+   * @param conf configuration
+   * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
+   * @param overrideVal override value: if non empty this is used instead of
+   * querying the configuration.
+   * @param defVal value to return if there is no password
+   * @return a password or the value of defVal.
+   * @throws IOException on any IO problem
+   * @throws IllegalArgumentException bad arguments
+   */
+  public static String lookupPassword(
+      String bucket,
+      Configuration conf,
+      String baseKey,
+      String overrideVal,
+      String defVal)
+      throws IOException {
     String initialVal;
     Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
         "%s does not start with $%s", baseKey, FS_S3A_PREFIX);
@@ -757,7 +816,7 @@ public final class S3AUtils {
       // no bucket, make the initial value the override value
       initialVal = overrideVal;
     }
-    return getPassword(conf, baseKey, initialVal);
+    return getPassword(conf, baseKey, initialVal, defVal);
   }
 
   /**
@@ -1059,6 +1118,134 @@ public final class S3AUtils {
     }
   }
 
+  /**
+   * Create a new AWS {@code ClientConfiguration}.
+   * All clients to AWS services <i>MUST</i> use this for consistent setup
+   * of connectivity, UA, proxy settings.
+   * @param conf The Hadoop configuration
+   * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+   * @return new AWS client configuration
+   */
+  public static ClientConfiguration createAwsConf(Configuration conf,
+      String bucket)
+      throws IOException {
+    final ClientConfiguration awsConf = new ClientConfiguration();
+    initConnectionSettings(conf, awsConf);
+    initProxySupport(conf, bucket, awsConf);
+    initUserAgent(conf, awsConf);
+    return awsConf;
+  }
+
+  /**
+   * Initializes all AWS SDK settings related to connection management.
+   *
+   * @param conf Hadoop configuration
+   * @param awsConf AWS SDK configuration
+   */
+  public static void initConnectionSettings(Configuration conf,
+      ClientConfiguration awsConf) {
+    awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+        DEFAULT_MAXIMUM_CONNECTIONS, 1));
+    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+        DEFAULT_SECURE_CONNECTIONS);
+    awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
+    awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+        DEFAULT_MAX_ERROR_RETRIES, 0));
+    awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+        DEFAULT_ESTABLISH_TIMEOUT, 0));
+    awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+        DEFAULT_SOCKET_TIMEOUT, 0));
+    int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+        DEFAULT_SOCKET_SEND_BUFFER, 2048);
+    int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+        DEFAULT_SOCKET_RECV_BUFFER, 2048);
+    awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
+    String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+    if (!signerOverride.isEmpty()) {
+     LOG.debug("Signer override = {}", signerOverride);
+      awsConf.setSignerOverride(signerOverride);
+    }
+  }
+
+  /**
+   * Initializes AWS SDK proxy support in the AWS client configuration
+   * if the S3A settings enable it.
+   *
+   * @param conf Hadoop configuration
+   * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+   * @param awsConf AWS SDK configuration to update
+   * @throws IllegalArgumentException if misconfigured
+   * @throws IOException problem getting username/secret from password source.
+   */
+  public static void initProxySupport(Configuration conf,
+      String bucket,
+      ClientConfiguration awsConf) throws IllegalArgumentException,
+      IOException {
+    String proxyHost = conf.getTrimmed(PROXY_HOST, "");
+    int proxyPort = conf.getInt(PROXY_PORT, -1);
+    if (!proxyHost.isEmpty()) {
+      awsConf.setProxyHost(proxyHost);
+      if (proxyPort >= 0) {
+        awsConf.setProxyPort(proxyPort);
+      } else {
+        if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
+          LOG.warn("Proxy host set without port. Using HTTPS default 443");
+          awsConf.setProxyPort(443);
+        } else {
+          LOG.warn("Proxy host set without port. Using HTTP default 80");
+          awsConf.setProxyPort(80);
+        }
+      }
+      final String proxyUsername = lookupPassword(bucket, conf, PROXY_USERNAME,
+          null, null);
+      final String proxyPassword = lookupPassword(bucket, conf, PROXY_PASSWORD,
+          null, null);
+      if ((proxyUsername == null) != (proxyPassword == null)) {
+        String msg = "Proxy error: " + PROXY_USERNAME + " or " +
+            PROXY_PASSWORD + " set without the other.";
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      awsConf.setProxyUsername(proxyUsername);
+      awsConf.setProxyPassword(proxyPassword);
+      awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+      awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+                "domain {} as workstation {}", awsConf.getProxyHost(),
+            awsConf.getProxyPort(),
+            String.valueOf(awsConf.getProxyUsername()),
+            awsConf.getProxyPassword(), awsConf.getProxyDomain(),
+            awsConf.getProxyWorkstation());
+      }
+    } else if (proxyPort >= 0) {
+      String msg =
+          "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+  }
+
+  /**
+   * Initializes the User-Agent header to send in HTTP requests to AWS
+   * services.  We always include the Hadoop version number.  The user also
+   * may set an optional custom prefix to put in front of the Hadoop version
+   * number.  The AWS SDK internally appends its own information, which seems
+   * to include the AWS SDK version, OS and JVM version.
+   *
+   * @param conf Hadoop configuration
+   * @param awsConf AWS SDK configuration to update
+   */
+  private static void initUserAgent(Configuration conf,
+      ClientConfiguration awsConf) {
+    String userAgent = "Hadoop " + VersionInfo.getVersion();
+    String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
+    if (!userAgentPrefix.isEmpty()) {
+      userAgent = userAgentPrefix + ", " + userAgent;
+    }
+    LOG.debug("Using User-Agent: {}", userAgent);
+    awsConf.setUserAgentPrefix(userAgent);
+  }
 
   /**
    * An interface for use in lambda-expressions working with
@@ -1289,18 +1476,40 @@ public final class S3AUtils {
    * @param closeables the objects to close
    */
   public static void closeAll(Logger log,
-      java.io.Closeable... closeables) {
-    for (java.io.Closeable c : closeables) {
+      Closeable... closeables) {
+    if (log == null) {
+      log = LOG;
+    }
+    for (Closeable c : closeables) {
       if (c != null) {
         try {
-          if (log != null) {
-            log.debug("Closing {}", c);
-          }
+          log.debug("Closing {}", c);
           c.close();
         } catch (Exception e) {
-          if (log != null && log.isDebugEnabled()) {
-            log.debug("Exception in closing {}", c, e);
-          }
+          log.debug("Exception in closing {}", c, e);
+        }
+      }
+    }
+  }
+  /**
+   * Close the Closeable objects and <b>ignore</b> any Exception or
+   * null pointers.
+   * (This is the SLF4J equivalent of that in {@code IOUtils}).
+   * @param log the log to log at debug level. Can be null.
+   * @param closeables the objects to close
+   */
+  public static void closeAutocloseables(Logger log,
+      AutoCloseable... closeables) {
+    if (log == null) {
+      log = LOG;
+    }
+    for (AutoCloseable c : closeables) {
+      if (c != null) {
+        try {
+          log.debug("Closing {}", c);
+          c.close();
+        } catch (Exception e) {
+          log.debug("Exception in closing {}", c, e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 9abb362..b237e85 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
 import java.io.IOException;
 import java.net.URI;
 
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -37,9 +38,13 @@ public interface S3ClientFactory {
    * Creates a new {@link AmazonS3} client.
    *
    * @param name raw input S3A file system URI
+   * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+   * @param credentialSet credentials to use
    * @return S3 client
    * @throws IOException IO problem
    */
-  AmazonS3 createS3Client(URI name) throws IOException;
+  AmazonS3 createS3Client(URI name,
+      final String bucket,
+      final AWSCredentialsProvider credentialSet) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java
index fdaf9bd..e5a3639 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java
@@ -24,9 +24,11 @@ import java.net.URI;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -37,6 +39,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
 import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -77,17 +82,21 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
 
   private final String arn;
 
+  private final AWSCredentialProviderList credentialsToSTS;
+
+  private final Invoker invoker;
+
   /**
    * Instantiate.
    * This calls {@link #getCredentials()} to fail fast on the inner
    * role credential retrieval.
-   * @param uri URI of endpoint.
+   * @param fsUri URI of the filesystem.
    * @param conf configuration
    * @throws IOException on IO problems and some parameter checking
    * @throws IllegalArgumentException invalid parameters
    * @throws AWSSecurityTokenServiceException problems getting credentials
    */
-  public AssumedRoleCredentialProvider(URI uri, Configuration conf)
+  public AssumedRoleCredentialProvider(URI fsUri, Configuration conf)
       throws IOException {
 
     arn = conf.getTrimmed(ASSUMED_ROLE_ARN, "");
@@ -99,13 +108,14 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
     Class<?>[] awsClasses = loadAWSProviderClasses(conf,
         ASSUMED_ROLE_CREDENTIALS_PROVIDER,
         SimpleAWSCredentialsProvider.class);
-    AWSCredentialProviderList credentials = new AWSCredentialProviderList();
+    credentialsToSTS = new AWSCredentialProviderList();
     for (Class<?> aClass : awsClasses) {
       if (this.getClass().equals(aClass)) {
         throw new IOException(E_FORBIDDEN_PROVIDER);
       }
-      credentials.add(createAWSCredentialProvider(conf, aClass, uri));
+      credentialsToSTS.add(createAWSCredentialProvider(conf, aClass, fsUri));
     }
+    LOG.debug("Credentials to obtain role credentials: {}", credentialsToSTS);
 
     // then the STS binding
     sessionName = conf.getTrimmed(ASSUMED_ROLE_SESSION_NAME,
@@ -122,14 +132,27 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
       LOG.debug("Scope down policy {}", policy);
       builder.withScopeDownPolicy(policy);
     }
-    String epr = conf.get(ASSUMED_ROLE_STS_ENDPOINT, "");
-    if (StringUtils.isNotEmpty(epr)) {
-      LOG.debug("STS Endpoint: {}", epr);
-      builder.withServiceEndpoint(epr);
-    }
-    LOG.debug("Credentials to obtain role credentials: {}", credentials);
-    builder.withLongLivedCredentialsProvider(credentials);
+    String endpoint = conf.get(ASSUMED_ROLE_STS_ENDPOINT, "");
+    String region = conf.get(ASSUMED_ROLE_STS_ENDPOINT_REGION,
+        ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT);
+    AWSSecurityTokenServiceClientBuilder stsbuilder =
+        STSClientFactory.builder(
+          conf,
+          fsUri.getHost(),
+          credentialsToSTS,
+          endpoint,
+          region);
+    // the STS client is not tracked for a shutdown in close(), because it
+    // (currently) throws an UnsupportedOperationException in shutdown().
+    builder.withStsClient(stsbuilder.build());
+
+    //now build the provider
     stsProvider = builder.build();
+
+    // to handle STS throttling by the AWS account, we
+    // need to retry
+    invoker = new Invoker(new S3ARetryPolicy(conf), this::operationRetried);
+
     // and force in a fail-fast check just to keep the stack traces less
     // convoluted
     getCredentials();
@@ -143,7 +166,17 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
   @Override
   public AWSCredentials getCredentials() {
     try {
-      return stsProvider.getCredentials();
+      return invoker.retryUntranslated("getCredentials",
+          true,
+          stsProvider::getCredentials);
+    } catch (IOException e) {
+      // this is in the signature of retryUntranslated;
+      // its hard to see how this could be raised, but for
+      // completeness, it is wrapped as an Amazon Client Exception
+      // and rethrown.
+      throw new AmazonClientException(
+          "getCredentials failed: " + e,
+          e);
     } catch (AWSSecurityTokenServiceException e) {
       LOG.error("Failed to get credentials for role {}",
           arn, e);
@@ -161,7 +194,7 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
    */
   @Override
   public void close() {
-    stsProvider.close();
+    S3AUtils.closeAutocloseables(LOG, stsProvider, credentialsToSTS);
   }
 
   @Override
@@ -205,4 +238,23 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
     return r.toString();
   }
 
+  /**
+   * Callback from {@link Invoker} when an operation is retried.
+   * @param text text of the operation
+   * @param ex exception
+   * @param retries number of retries
+   * @param idempotent is the method idempotent
+   */
+  public void operationRetried(
+      String text,
+      Exception ex,
+      int retries,
+      boolean idempotent) {
+    if (retries == 0) {
+      // log on the first retry attempt of the credential access.
+      // At worst, this means one log entry every intermittent renewal
+      // time.
+      LOG.info("Retried {}", text);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java
new file mode 100644
index 0000000..f48e17a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import com.amazonaws.AmazonClientException;
+
+/**
+ * A specific subclass of {@code AmazonClientException} which can
+ * be used in the retry logic to fail fast when there is any
+ * authentication problem.
+ */
+public class NoAuthWithAWSException extends AmazonClientException {
+
+  public NoAuthWithAWSException(final String message, final Throwable t) {
+    super(message, t);
+  }
+
+  public NoAuthWithAWSException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java
index ca2c993..d4568b0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java
@@ -205,6 +205,14 @@ public class RoleModel {
     return new Policy(statements);
   }
 
+  /**
+   * From a set of statements, create a policy.
+   * @param statements statements
+   * @return the policy
+   */
+  public static Policy policy(final List<RoleModel.Statement> statements) {
+    return new Policy(statements);
+  }
 
   /**
    * Effect options.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
index 6711eee..34ed295 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java
@@ -29,6 +29,55 @@ public final class RolePolicies {
   private RolePolicies() {
   }
 
+  /** All KMS operations: {@value}.*/
+  public static final String KMS_ALL_OPERATIONS = "kms:*";
+
+  /** KMS encryption. This is <i>Not</i> used by SSE-KMS: {@value}. */
+  public static final String KMS_ENCRYPT = "kms:Encrypt";
+
+  /**
+   * Decrypt data encrypted with SSE-KMS: {@value}.
+   */
+  public static final String KMS_DECRYPT = "kms:Decrypt";
+
+  /**
+   * Arn for all KMS keys: {@value}.
+   */
+  public static final String KMS_ALL_KEYS = "arn:aws:kms:*";
+
+  /**
+   * This is used by S3 to generate a per-object encryption key and
+   * the encrypted value of this, the latter being what it tags
+   * the object with for later decryption: {@value}.
+   */
+  public static final String KMS_GENERATE_DATA_KEY = "kms:GenerateDataKey";
+
+  /**
+   * Actions needed to read and write SSE-KMS data.
+   */
+  private static final String[] KMS_KEY_RW =
+      new String[]{KMS_DECRYPT, KMS_GENERATE_DATA_KEY};
+
+  /**
+   * Actions needed to read SSE-KMS data.
+   */
+  private static final String[] KMS_KEY_READ =
+      new String[] {KMS_DECRYPT};
+
+  /**
+   * Statement to allow KMS R/W access access, so full use of
+   * SSE-KMS.
+   */
+  public static final Statement STATEMENT_ALLOW_SSE_KMS_RW =
+      statement(true, KMS_ALL_KEYS, KMS_KEY_RW);
+
+  /**
+   * Statement to allow read access to KMS keys, so the ability
+   * to read SSE-KMS data,, but not decrypt it.
+   */
+  public static final Statement STATEMENT_ALLOW_SSE_KMS_READ =
+      statement(true, KMS_ALL_KEYS, KMS_KEY_READ);
+
   /**
    * All S3 operations: {@value}.
    */
@@ -52,7 +101,6 @@ public final class RolePolicies {
   public static final String S3_LIST_BUCKET_MULTPART_UPLOADS =
       "s3:ListBucketMultipartUploads";
 
-
   /**
    * List multipart upload is needed for the S3A Commit protocols.
    */
@@ -97,6 +145,8 @@ public final class RolePolicies {
 
   public static final String S3_GET_OBJECT_VERSION = "s3:GetObjectVersion";
 
+  public static final String S3_GET_BUCKET_LOCATION = "s3:GetBucketLocation";
+
   public static final String S3_GET_OBJECT_VERSION_ACL
       = "s3:GetObjectVersionAcl";
 
@@ -128,7 +178,8 @@ public final class RolePolicies {
   public static final String S3_RESTORE_OBJECT = "s3:RestoreObject";
 
   /**
-   * Actions needed to read data from S3 through S3A.
+   * Actions needed to read a file in S3 through S3A, excluding
+   * S3Guard and SSE-KMS.
    */
   public static final String[] S3_PATH_READ_OPERATIONS =
       new String[]{
@@ -136,18 +187,20 @@ public final class RolePolicies {
       };
 
   /**
-   * Actions needed to read data from S3 through S3A.
+   * Base actions needed to read data from S3 through S3A,
+   * excluding SSE-KMS data and S3Guard-ed buckets.
    */
   public static final String[] S3_ROOT_READ_OPERATIONS =
       new String[]{
           S3_LIST_BUCKET,
           S3_LIST_BUCKET_MULTPART_UPLOADS,
-          S3_GET_OBJECT,
+          S3_ALL_GET,
       };
 
   /**
    * Actions needed to write data to an S3A Path.
-   * This includes the appropriate read operations.
+   * This includes the appropriate read operations, but
+   * not SSE-KMS or S3Guard support.
    */
   public static final String[] S3_PATH_RW_OPERATIONS =
       new String[]{
@@ -163,6 +216,7 @@ public final class RolePolicies {
    * This is purely the extra operations needed for writing atop
    * of the read operation set.
    * Deny these and a path is still readable, but not writeable.
+   * Excludes: SSE-KMS and S3Guard permissions.
    */
   public static final String[] S3_PATH_WRITE_OPERATIONS =
       new String[]{
@@ -173,6 +227,7 @@ public final class RolePolicies {
 
   /**
    * Actions needed for R/W IO from the root of a bucket.
+   * Excludes: SSE-KMS and S3Guard permissions.
    */
   public static final String[] S3_ROOT_RW_OPERATIONS =
       new String[]{
@@ -190,26 +245,57 @@ public final class RolePolicies {
    */
   public static final String DDB_ALL_OPERATIONS = "dynamodb:*";
 
-  public static final String DDB_ADMIN = "dynamodb:*";
+  /**
+   * Operations needed for DDB/S3Guard Admin.
+   * For now: make this {@link #DDB_ALL_OPERATIONS}.
+   */
+  public static final String DDB_ADMIN = DDB_ALL_OPERATIONS;
 
+  /**
+   * Permission for DDB describeTable() operation: {@value}.
+   * This is used during initialization.
+   */
+  public static final String DDB_DESCRIBE_TABLE = "dynamodb:DescribeTable";
 
-  public static final String DDB_BATCH_WRITE = "dynamodb:BatchWriteItem";
+  /**
+   * Permission to query the DDB table: {@value}.
+   */
+  public static final String DDB_QUERY = "dynamodb:Query";
 
   /**
-   * All DynamoDB tables: {@value}.
+   * Permission for DDB operation to get a record: {@value}.
    */
-  public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:::*";
+  public static final String DDB_GET_ITEM = "dynamodb:GetItem";
 
+  /**
+   * Permission for DDB write record operation: {@value}.
+   */
+  public static final String DDB_PUT_ITEM = "dynamodb:PutItem";
 
+  /**
+   * Permission for DDB update single item operation: {@value}.
+   */
+  public static final String DDB_UPDATE_ITEM = "dynamodb:UpdateItem";
 
-  public static final String WILDCARD = "*";
+  /**
+   * Permission for DDB delete operation: {@value}.
+   */
+  public static final String DDB_DELETE_ITEM = "dynamodb:DeleteItem";
 
   /**
-   * Allow all S3 Operations.
+   * Permission for DDB operation: {@value}.
    */
-  public static final Statement STATEMENT_ALL_S3 = statement(true,
-      S3_ALL_BUCKETS,
-      S3_ALL_OPERATIONS);
+  public static final String DDB_BATCH_GET_ITEM = "dynamodb:BatchGetItem";
+
+  /**
+   * Batch write permission for DDB: {@value}.
+   */
+  public static final String DDB_BATCH_WRITE_ITEM = "dynamodb:BatchWriteItem";
+
+  /**
+   * All DynamoDB tables: {@value}.
+   */
+  public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:*";
 
   /**
    * Statement to allow all DDB access.
@@ -218,11 +304,36 @@ public final class RolePolicies {
       ALL_DDB_TABLES, DDB_ALL_OPERATIONS);
 
   /**
-   * Allow all S3 and S3Guard operations.
+   * Statement to allow all client operations needed for S3Guard,
+   * but none of the admin operations.
+   */
+  public static final Statement STATEMENT_S3GUARD_CLIENT = statement(true,
+      ALL_DDB_TABLES,
+      DDB_BATCH_GET_ITEM,
+      DDB_BATCH_WRITE_ITEM,
+      DDB_DELETE_ITEM,
+      DDB_DESCRIBE_TABLE,
+      DDB_GET_ITEM,
+      DDB_PUT_ITEM,
+      DDB_QUERY,
+      DDB_UPDATE_ITEM
+      );
+
+  /**
+   * Allow all S3 Operations.
+   * This does not cover DDB or S3-KMS
+   */
+  public static final Statement STATEMENT_ALL_S3 = statement(true,
+      S3_ALL_BUCKETS,
+      S3_ALL_OPERATIONS);
+
+  /**
+   * Policy for all S3 and S3Guard operations, and SSE-KMS.
    */
   public static final Policy ALLOW_S3_AND_SGUARD = policy(
       STATEMENT_ALL_S3,
-      STATEMENT_ALL_DDB
+      STATEMENT_ALL_DDB,
+      STATEMENT_ALLOW_SSE_KMS_RW
   );
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java
new file mode 100644
index 0000000..10bf88c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import java.io.IOException;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+
+/**
+ * Factory for creating STS Clients.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class STSClientFactory {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(STSClientFactory.class);
+
+  /**
+   * Create the builder ready for any final configuration options.
+   * Picks up connection settings from the Hadoop configuration, including
+   * proxy secrets.
+   * @param conf Configuration to act as source of options.
+   * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+   * @param credentials AWS credential chain to use
+   * @param stsEndpoint optional endpoint "https://sns.us-west-1.amazonaws.com"
+   * @param stsRegion the region, e.g "us-west-1"
+   * @return the builder to call {@code build()}
+   * @throws IOException problem reading proxy secrets
+   */
+  public static AWSSecurityTokenServiceClientBuilder builder(
+      final Configuration conf,
+      final String bucket,
+      final AWSCredentialsProvider credentials, final String stsEndpoint,
+      final String stsRegion) throws IOException {
+    Preconditions.checkArgument(credentials != null, "No credentials");
+    final AWSSecurityTokenServiceClientBuilder builder
+        = AWSSecurityTokenServiceClientBuilder.standard();
+    final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
+    builder.withClientConfiguration(awsConf);
+    builder.withCredentials(credentials);
+    if (StringUtils.isNotEmpty(stsEndpoint)) {
+      LOG.debug("STS Endpoint ={}", stsEndpoint);
+      builder.withEndpointConfiguration(
+          new AwsClientBuilder.EndpointConfiguration(stsEndpoint, stsRegion));
+    }
+    return builder;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
index 91e64cd..9e1d2f4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
@@ -34,10 +34,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+import org.apache.hadoop.fs.s3a.S3AUtils;
 
 import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
-import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
 
 /**
  * Interface to create a DynamoDB client.
@@ -58,10 +57,14 @@ public interface DynamoDBClientFactory extends Configurable {
    * it will indicate an error.
    *
    * @param defaultRegion the default region of the AmazonDynamoDB client
+   * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+   * @param credentials credentials to use for authentication.
    * @return a new DynamoDB client
    * @throws IOException if any IO error happens
    */
-  AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException;
+  AmazonDynamoDB createDynamoDBClient(final String defaultRegion,
+      final String bucket,
+      final AWSCredentialsProvider credentials) throws IOException;
 
   /**
    * The default implementation for creating an AmazonDynamoDB.
@@ -69,16 +72,15 @@ public interface DynamoDBClientFactory extends Configurable {
   class DefaultDynamoDBClientFactory extends Configured
       implements DynamoDBClientFactory {
     @Override
-    public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
+    public AmazonDynamoDB createDynamoDBClient(String defaultRegion,
+        final String bucket,
+        final AWSCredentialsProvider credentials)
         throws IOException {
       Preconditions.checkNotNull(getConf(),
           "Should have been configured before usage");
 
       final Configuration conf = getConf();
-      final AWSCredentialsProvider credentials =
-          createAWSCredentialProviderSet(null, conf);
-      final ClientConfiguration awsConf =
-          DefaultS3ClientFactory.createAwsConf(conf);
+      final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
 
       final String region = getRegion(conf, defaultRegion);
       LOG.debug("Creating DynamoDB client in region {}", region);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 43849b1..ba80b88 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
+import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
 import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
 import com.amazonaws.services.dynamodbv2.document.DynamoDB;
@@ -67,6 +69,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.Invoker;
 import org.apache.hadoop.fs.s3a.Retries;
@@ -75,13 +78,14 @@ import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
 import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.Tristate;
+import org.apache.hadoop.fs.s3a.auth.RolePolicies;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
 import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
 
@@ -207,6 +211,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
       new ValueMap().withBoolean(":false", false);
 
   private DynamoDB dynamoDB;
+  private AWSCredentialProviderList credentials;
   private String region;
   private Table table;
   private String tableName;
@@ -242,10 +247,16 @@ public class DynamoDBMetadataStore implements MetadataStore {
    * A utility function to create DynamoDB instance.
    * @param conf the file system configuration
    * @param s3Region region of the associated S3 bucket (if any).
+   * @param bucket Optional bucket to use to look up per-bucket proxy secrets
+   * @param credentials credentials.
    * @return DynamoDB instance.
    * @throws IOException I/O error.
    */
-  private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
+  private static DynamoDB createDynamoDB(
+      final Configuration conf,
+      final String s3Region,
+      final String bucket,
+      final AWSCredentialsProvider credentials)
       throws IOException {
     Preconditions.checkNotNull(conf);
     final Class<? extends DynamoDBClientFactory> cls = conf.getClass(
@@ -254,10 +265,18 @@ public class DynamoDBMetadataStore implements MetadataStore {
         DynamoDBClientFactory.class);
     LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region);
     final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
-        .createDynamoDBClient(s3Region);
+        .createDynamoDBClient(s3Region, bucket, credentials);
     return new DynamoDB(dynamoDBClient);
   }
 
+  /**
+   * {@inheritDoc}.
+   * The credentials for authenticating with S3 are requested from the
+   * FS via {@link S3AFileSystem#shareCredentials(String)}; this will
+   * increment the reference counter of these credentials.
+   * @param fs {@code S3AFileSystem} associated with the MetadataStore
+   * @throws IOException on a failure
+   */
   @Override
   @Retries.OnceRaw
   public void initialize(FileSystem fs) throws IOException {
@@ -274,11 +293,23 @@ public class DynamoDBMetadataStore implements MetadataStore {
       LOG.debug("Overriding S3 region with configured DynamoDB region: {}",
           region);
     } else {
-      region = owner.getBucketLocation();
+      try {
+        region = owner.getBucketLocation();
+      } catch (AccessDeniedException e) {
+        // access denied here == can't call getBucket. Report meaningfully
+        URI uri = owner.getUri();
+        LOG.error("Failed to get bucket location from S3 bucket {}",
+            uri);
+        throw (IOException)new AccessDeniedException(
+            "S3 client role lacks permission "
+                + RolePolicies.S3_GET_BUCKET_LOCATION + " for " + uri)
+            .initCause(e);
+      }
       LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
     }
     username = owner.getUsername();
-    dynamoDB = createDynamoDB(conf, region);
+    credentials = owner.shareCredentials("s3guard");
+    dynamoDB = createDynamoDB(conf, region, bucket, credentials);
 
     // use the bucket as the DynamoDB table name if not specified in config
     tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
@@ -311,6 +342,9 @@ public class DynamoDBMetadataStore implements MetadataStore {
    * must declare the table name and region in the
    * {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and
    * {@link Constants#S3GUARD_DDB_REGION_KEY} respectively.
+   * It also creates a new credential provider list from the configuration,
+   * using the base fs.s3a.* options, as there is no bucket to infer per-bucket
+   * settings from.
    *
    * @see #initialize(FileSystem)
    * @throws IOException if there is an error
@@ -327,7 +361,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
     region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
     Preconditions.checkArgument(!StringUtils.isEmpty(region),
         "No DynamoDB region configured");
-    dynamoDB = createDynamoDB(conf, region);
+    credentials = createAWSCredentialProviderSet(null, conf);
+    dynamoDB = createDynamoDB(conf, region, null, credentials);
 
     username = UserGroupInformation.getCurrentUser().getShortUserName();
     initDataAccessRetries(conf);
@@ -778,12 +813,17 @@ public class DynamoDBMetadataStore implements MetadataStore {
     if (instrumentation != null) {
       instrumentation.storeClosed();
     }
-    if (dynamoDB != null) {
-      LOG.debug("Shutting down {}", this);
-      dynamoDB.shutdown();
-      dynamoDB = null;
+    try {
+      if (dynamoDB != null) {
+        LOG.debug("Shutting down {}", this);
+        dynamoDB.shutdown();
+        dynamoDB = null;
+      }
+    } finally {
+      closeAutocloseables(LOG, credentials);
+      credentials = null;
     }
-  }
+}
 
   @Override
   @Retries.OnceTranslated


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org