You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/10/05 10:46:29 UTC

[hadoop] 04/04: HADOOP-17922. move to fs.s3a.encryption.algorithm - JCEKS integration (#3466)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 6f7b45641aba6d8ab4e7d7961fdc785c266a27e7
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Thu Sep 30 10:38:53 2021 +0100

    HADOOP-17922. move to fs.s3a.encryption.algorithm - JCEKS integration (#3466)
    
    The ordering of the resolution of new and deprecated s3a encryption options
    & secrets is the same when JCEKS and other hadoop credentials stores are used
    to store them as when they are in XML files: per-bucket settings always take
    priority over global values, even when the bucket-level options use the
    old option names.
    
    Contributed by Mehakmeet Singh and Steve Loughran
    
    Change-Id: I871672071efa2eb6b600cb2658fceeef57f658a3
---
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |  27 ++-
 .../apache/hadoop/fs/s3a/S3AEncryptionMethods.java |  54 +++--
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  10 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java    | 145 ++++++++++--
 .../src/site/markdown/tools/hadoop-aws/index.md    |  53 +++--
 .../hadoop/fs/s3a/AbstractTestS3AEncryption.java   |   7 +-
 .../fs/s3a/ITestS3AClientSideEncryption.java       |  15 +-
 .../fs/s3a/ITestS3AClientSideEncryptionKms.java    |   8 +-
 .../hadoop/fs/s3a/ITestS3AConfiguration.java       | 123 ----------
 .../ITestS3AEncryptionSSEKMSUserDefinedKey.java    |  11 +-
 .../ITestS3AEncryptionWithDefaultS3Settings.java   |  12 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  31 ++-
 .../hadoop/fs/s3a/TestBucketConfiguration.java     | 261 +++++++++++++++++++++
 .../ITestSessionDelegationInFileystem.java         |   4 +-
 .../fs/s3a/scale/ITestS3AHugeFilesEncryption.java  |   8 +-
 15 files changed, 553 insertions(+), 216 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 441ae70..b9e51ef 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -58,8 +58,9 @@ import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
-import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
 
 /**
@@ -96,6 +97,9 @@ public class DefaultS3ClientFactory extends Configured
   /** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
   private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
 
+  /** Bucket name. */
+  private String bucket;
+
   /**
    * Create the client by preparing the AwsConf configuration
    * and then invoking {@code buildAmazonS3Client()}.
@@ -105,9 +109,10 @@ public class DefaultS3ClientFactory extends Configured
       final URI uri,
       final S3ClientCreationParameters parameters) throws IOException {
     Configuration conf = getConf();
+    bucket = uri.getHost();
     final ClientConfiguration awsConf = S3AUtils
         .createAwsConf(conf,
-            uri.getHost(),
+            bucket,
             Constants.AWS_SERVICE_IDENTIFIER_S3);
     // add any headers
     parameters.getHeaders().forEach((h, v) ->
@@ -126,10 +131,13 @@ public class DefaultS3ClientFactory extends Configured
       awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
     }
 
+    // Get the encryption method for this bucket.
+    S3AEncryptionMethods encryptionMethods =
+        getEncryptionAlgorithm(bucket, conf);
     try {
-      if (S3AEncryptionMethods.getMethod(S3AUtils.
-          lookupPassword(conf, S3_ENCRYPTION_ALGORITHM, null))
-          .equals(S3AEncryptionMethods.CSE_KMS)) {
+      // If CSE is enabled then build a S3EncryptionClient.
+      if (S3AEncryptionMethods.CSE_KMS.getMethod()
+          .equals(encryptionMethods.getMethod())) {
         return buildAmazonS3EncryptionClient(
             awsConf,
             parameters);
@@ -163,12 +171,11 @@ public class DefaultS3ClientFactory extends Configured
         new AmazonS3EncryptionClientV2Builder();
     Configuration conf = getConf();
 
-    //CSE-KMS Method
-    String kmsKeyId = S3AUtils.lookupPassword(conf,
-        S3_ENCRYPTION_KEY, null);
+    // CSE-KMS Method
+    String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
     // Check if kmsKeyID is not null
-    Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
-        + "requires KMS key ID. Use " + S3_ENCRYPTION_KEY
+    Preconditions.checkArgument(!StringUtils.isBlank(kmsKeyId), "CSE-KMS "
+        + "method requires KMS key ID. Use " + S3_ENCRYPTION_KEY
         + " property to set it. ");
 
     EncryptionMaterialsProvider materialsProvider =
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
index 85a00b11b..b599790 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
@@ -25,31 +25,45 @@ import org.apache.commons.lang3.StringUtils;
 /**
  * This enum is to centralize the encryption methods and
  * the value required in the configuration.
- *
- * There's two enum values for the two client encryption mechanisms the AWS
- * S3 SDK supports, even though these are not currently supported in S3A.
- * This is to aid supporting CSE in some form in future, fundamental
- * issues about file length of encrypted data notwithstanding.
- *
  */
 public enum S3AEncryptionMethods {
 
-  NONE("", false),
-  SSE_S3("AES256", true),
-  SSE_KMS("SSE-KMS", true),
-  SSE_C("SSE-C", true),
-  CSE_KMS("CSE-KMS", false),
-  CSE_CUSTOM("CSE-CUSTOM", false);
+  NONE("", false, false),
+  SSE_S3("AES256", true, false),
+  SSE_KMS("SSE-KMS", true, false),
+  SSE_C("SSE-C", true, true),
+  CSE_KMS("CSE-KMS", false, true),
+  CSE_CUSTOM("CSE-CUSTOM", false, true);
 
+  /**
+   * Error string when {@link #getMethod(String)} fails.
+   * Used in tests.
+   */
   static final String UNKNOWN_ALGORITHM
       = "Unknown encryption algorithm ";
 
-  private String method;
-  private boolean serverSide;
+  /**
+   * What is the encryption method?
+   */
+  private final String method;
+
+  /**
+   * Is this server side?
+   */
+  private final boolean serverSide;
+
+  /**
+   * Does the encryption method require a
+   * secret in the encryption.key property?
+   */
+  private final boolean requiresSecret;
 
-  S3AEncryptionMethods(String method, final boolean serverSide) {
+  S3AEncryptionMethods(String method,
+      final boolean serverSide,
+      final boolean requiresSecret) {
     this.method = method;
     this.serverSide = serverSide;
+    this.requiresSecret = requiresSecret;
   }
 
   public String getMethod() {
@@ -65,6 +79,14 @@ public enum S3AEncryptionMethods {
   }
 
   /**
+   * Does this encryption algorithm require a secret?
+   * @return true if a secret must be retrieved.
+   */
+  public boolean requiresSecret() {
+    return requiresSecret;
+  }
+
+  /**
    * Get the encryption mechanism from the value provided.
    * @param name algorithm name
    * @return the method
@@ -75,7 +97,7 @@ public enum S3AEncryptionMethods {
       return NONE;
     }
     for (S3AEncryptionMethods v : values()) {
-      if (v.getMethod().equals(name)) {
+      if (v.getMethod().equalsIgnoreCase(name)) {
         return v;
       }
     }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index a6b9862..3b4940d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -433,17 +433,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
       // look for encryption data
       // DT Bindings may override this
-      setEncryptionSecrets(new EncryptionSecrets(
-          getEncryptionAlgorithm(bucket, conf),
-          getS3EncryptionKey(bucket, getConf())));
+      setEncryptionSecrets(
+          buildEncryptionSecrets(bucket, conf));
 
       invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
       instrumentation = new S3AInstrumentation(uri);
       initializeStatisticsBinding();
       // If CSE-KMS method is set then CSE is enabled.
-      isCSEEnabled = S3AUtils.lookupPassword(conf,
-          Constants.S3_ENCRYPTION_ALGORITHM, "")
-          .equals(S3AEncryptionMethods.CSE_KMS.getMethod());
+      isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
+          .equals(getS3EncryptionAlgorithm().getMethod());
       LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
       setCSEGauge();
       // Username is the current user at the time the FS was instantiated.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index b5dc56f..8a82417 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
 import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
@@ -64,6 +65,7 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.io.UncheckedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -1568,22 +1570,101 @@ public final class S3AUtils {
   }
 
   /**
-   * Get any SSE/CSE key from a configuration/credential provider.
-   * This operation handles the case where the option has been
-   * set in the provider or configuration to the option
-   * {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
-   * IOExceptions raised during retrieval are swallowed.
+   * Lookup a per-bucket-secret from a configuration including JCEKS files.
+   * No attempt is made to look for the global configuration.
+   * @param bucket bucket or "" if none known
+   * @param conf configuration
+   * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
+   * @return the secret or null.
+   * @throws IOException on any IO problem
+   * @throws IllegalArgumentException bad arguments
+   */
+  private static String lookupBucketSecret(
+      String bucket,
+      Configuration conf,
+      String baseKey)
+      throws IOException {
+
+    Preconditions.checkArgument(!isEmpty(bucket), "null/empty bucket argument");
+    Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
+        "%s does not start with $%s", baseKey, FS_S3A_PREFIX);
+    String subkey = baseKey.substring(FS_S3A_PREFIX.length());
+
+    // set from the long key unless overidden.
+    String longBucketKey = String.format(
+        BUCKET_PATTERN, bucket, baseKey);
+    String initialVal = getPassword(conf, longBucketKey, null, null);
+    // then override from the short one if it is set
+    String shortBucketKey = String.format(
+        BUCKET_PATTERN, bucket, subkey);
+    return getPassword(conf, shortBucketKey, initialVal, null);
+  }
+
+  /**
+   * Get any S3 encryption key, without propagating exceptions from
+   * JCEKs files.
    * @param bucket bucket to query for
    * @param conf configuration to examine
    * @return the encryption key or ""
    * @throws IllegalArgumentException bad arguments.
    */
-  public static String getS3EncryptionKey(String bucket,
+  public static String getS3EncryptionKey(
+      String bucket,
       Configuration conf) {
     try {
-      return lookupPassword(bucket, conf, Constants.S3_ENCRYPTION_KEY);
+      return getS3EncryptionKey(bucket, conf, false);
     } catch (IOException e) {
-      LOG.error("Cannot retrieve " + Constants.S3_ENCRYPTION_KEY, e);
+      // never going to happen, but to make sure, covert to
+      // runtime exception
+      throw new UncheckedIOException(e);
+    }
+  }
+
+    /**
+     * Get any SSE/CSE key from a configuration/credential provider.
+     * This operation handles the case where the option has been
+     * set in the provider or configuration to the option
+     * {@code SERVER_SIDE_ENCRYPTION_KEY}.
+     * IOExceptions raised during retrieval are swallowed.
+     * @param bucket bucket to query for
+     * @param conf configuration to examine
+     * @param propagateExceptions should IO exceptions be rethrown?
+     * @return the encryption key or ""
+     * @throws IllegalArgumentException bad arguments.
+     * @throws IOException if propagateExceptions==true and reading a JCEKS file raised an IOE
+     */
+  @SuppressWarnings("deprecation")
+  public static String getS3EncryptionKey(
+      String bucket,
+      Configuration conf,
+      boolean propagateExceptions) throws IOException {
+    try {
+      // look up the per-bucket value of the new key,
+      // which implicitly includes the deprecation remapping
+      String key = lookupBucketSecret(bucket, conf, S3_ENCRYPTION_KEY);
+      if (key == null) {
+        // old key in bucket, jceks
+        key = lookupBucketSecret(bucket, conf, SERVER_SIDE_ENCRYPTION_KEY);
+      }
+      if (key == null) {
+        // new key, global; implicit translation of old key in XML files.
+        key = lookupPassword(null, conf, S3_ENCRYPTION_KEY);
+      }
+      if (key == null) {
+        // old key, JCEKS
+        key = lookupPassword(null, conf, SERVER_SIDE_ENCRYPTION_KEY);
+      }
+      if (key == null) {
+        // no key, return ""
+        key = "";
+      }
+      return key;
+    } catch (IOException e) {
+      if (propagateExceptions) {
+        throw e;
+      }
+      LOG.warn("Cannot retrieve {} for bucket {}",
+          S3_ENCRYPTION_KEY, bucket, e);
       return "";
     }
   }
@@ -1597,14 +1678,50 @@ public final class S3AUtils {
    * @param conf configuration to scan
    * @return the encryption mechanism (which will be {@code NONE} unless
    * one is set.
-   * @throws IOException on any validation problem.
+   * @throws IOException on JCKES lookup or invalid method/key configuration.
    */
   public static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
       Configuration conf) throws IOException {
-    S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(
-        lookupPassword(bucket, conf,
-            Constants.S3_ENCRYPTION_ALGORITHM));
-    String encryptionKey = getS3EncryptionKey(bucket, conf);
+    return buildEncryptionSecrets(bucket, conf).getEncryptionMethod();
+  }
+
+  /**
+   * Get the server-side encryption or client side encryption algorithm.
+   * This includes validation of the configuration, checking the state of
+   * the encryption key given the chosen algorithm.
+   *
+   * @param bucket bucket to query for
+   * @param conf configuration to scan
+   * @return the encryption mechanism (which will be {@code NONE} unless
+   * one is set and secrets.
+   * @throws IOException on JCKES lookup or invalid method/key configuration.
+   */
+  @SuppressWarnings("deprecation")
+  public static EncryptionSecrets buildEncryptionSecrets(String bucket,
+      Configuration conf) throws IOException {
+
+    // new key, per-bucket
+    // this will include fixup of the old key in config XML entries
+    String algorithm = lookupBucketSecret(bucket, conf, S3_ENCRYPTION_ALGORITHM);
+    if (algorithm == null) {
+      // try the old key, per-bucket setting, which will find JCEKS values
+      algorithm = lookupBucketSecret(bucket, conf, SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    }
+    if (algorithm == null) {
+      // new key, global setting
+      // this will include fixup of the old key in config XML entries
+      algorithm = lookupPassword(null, conf, S3_ENCRYPTION_ALGORITHM);
+    }
+    if (algorithm == null) {
+      // old key, global setting, for JCEKS entries.
+      algorithm = lookupPassword(null, conf, SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    }
+    // now determine the algorithm
+    final S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(algorithm);
+
+    // look up the encryption key
+    String encryptionKey = getS3EncryptionKey(bucket, conf,
+        encryptionMethod.requiresSecret());
     int encryptionKeyLen =
         StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
     String diagnostics = passwordDiagnostics(encryptionKey, "key");
@@ -1638,7 +1755,7 @@ public final class S3AUtils {
       LOG.debug("Data is unencrypted");
       break;
     }
-    return encryptionMethod;
+    return new EncryptionSecrets(encryptionMethod, encryptionKey);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index f4f7144..3d08a1a 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1426,32 +1426,47 @@ Finally, the public `s3a://landsat-pds/` bucket can be accessed anonymously:
 </property>
 ```
 
-### Customizing S3A secrets held in credential files
+#### per-bucket configuration and deprecated configuration options
 
+Per-bucket declaration of the deprecated encryption options
+will take priority over a global option -even when the
+global option uses the newer configuration keys.
 
-Secrets in JCEKS files or provided by other Hadoop credential providers
-can also be configured on a per bucket basis. The S3A client will
-look for the per-bucket secrets be
+This means that when setting encryption options in XML files,
+the option, `fs.bucket.BUCKET.fs.s3a.server-side-encryption-algorithm`
+will take priority over the global value of `fs.bucket.s3a.encryption.algorithm`.
+The same holds for the encryption key option `fs.s3a.encryption.key`
+and its predecessor `fs.s3a.server-side-encryption.key`.
 
 
-Consider a JCEKS file with six keys:
+For a site configuration of:
 
-```
-fs.s3a.access.key
-fs.s3a.secret.key
-fs.s3a.encryption.algorithm
-fs.s3a.encryption.key
-fs.s3a.bucket.nightly.access.key
-fs.s3a.bucket.nightly.secret.key
-fs.s3a.bucket.nightly.session.token
-fs.s3a.bucket.nightly.server-side-encryption.key
-fs.s3a.bucket.nightly.server-side-encryption-algorithm
-```
+```xml
+<property>
+  <name>fs.s3a.bucket.nightly.server-side-encryption-algorithm</name>
+  <value>SSE-KMS</value>
+</property>
 
-When accessing the bucket `s3a://nightly/`, the per-bucket configuration
-options for that bucket will be used, here the access keys and token,
-and including the encryption algorithm and key.
+<property>
+  <name>fs.s3a.bucket.nightly.server-side-encryption.key</name>
+  <value>arn:aws:kms:eu-west-2:1528130000000:key/753778e4-2d0f-42e6-b894-6a3ae4ea4e5f</value>
+</property>
+
+<property>
+  <name>fs.s3a.encryption.algorithm</name>
+  <value>AES256</value>
+</property>
+
+<property>
+  <name>fs.s3a.encryption.key</name>
+  <value>unset</value>
+</property>
+
+
+```
 
+The bucket "nightly" will be encrypted with SSE-KMS using the KMS key
+`arn:aws:kms:eu-west-2:1528130000000:key/753778e4-2d0f-42e6-b894-6a3ae4ea4e5f`
 
 ###  <a name="per_bucket_endpoints"></a>Using Per-Bucket Configuration to access data round the world
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
index 8e3208c..7945c82 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
@@ -32,9 +32,11 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
 import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 
 /**
  * Test whether or not encryption works by turning it on. Some checks
@@ -163,8 +165,9 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
    */
   protected void assertEncrypted(Path path) throws IOException {
     //S3 will return full arn of the key, so specify global arn in properties
-    String kmsKeyArn = this.getConfiguration().
-        getTrimmed(S3_ENCRYPTION_KEY);
+    String kmsKeyArn =
+        getS3EncryptionKey(getTestBucketName(getConfiguration()),
+            getConfiguration());
     S3AEncryptionMethods algorithm = getSSEAlgorithm();
     EncryptionTestUtils.assertEncrypted(getFileSystem(),
             path,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
index bb052ed..4094b22 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
@@ -45,8 +45,12 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -193,6 +197,7 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
    * Testing how unencrypted and encrypted data behaves when read through
    * CSE enabled and disabled FS respectively.
    */
+  @SuppressWarnings("deprecation")
   @Test
   public void testEncryptionEnabledAndDisabledFS() throws Exception {
     maybeSkipTest();
@@ -203,8 +208,12 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
     Path encryptedFilePath = path(getMethodName() + "cse");
 
     // Initialize a CSE disabled FS.
-    cseDisabledConf.unset(S3_ENCRYPTION_ALGORITHM);
-    cseDisabledConf.unset(S3_ENCRYPTION_KEY);
+    removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+        cseDisabledConf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_KEY);
     cseDisabledFS.initialize(getFileSystem().getUri(),
         cseDisabledConf);
 
@@ -288,7 +297,7 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
   /**
    * Skip tests if certain conditions are met.
    */
-  protected abstract void maybeSkipTest();
+  protected abstract void maybeSkipTest() throws IOException;
 
   /**
    * Assert that at path references an encrypted blob.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
index 085c0f9..bcc37c8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 
 /**
  * Testing the S3 CSE - KMS method.
@@ -53,7 +55,7 @@ public class ITestS3AClientSideEncryptionKms
   }
 
   @Override
-  protected void maybeSkipTest() {
+  protected void maybeSkipTest() throws IOException {
     skipIfEncryptionTestsDisabled(getConfiguration());
     // skip the test if CSE-KMS or KMS key is not set.
     skipIfEncryptionNotSet(getConfiguration(), S3AEncryptionMethods.CSE_KMS);
@@ -71,8 +73,8 @@ public class ITestS3AClientSideEncryptionKms
 
     // Assert content encryption algo for KMS, is present in the
     // materials description and KMS key ID isn't.
-    String keyId =
-        getConfiguration().get(Constants.S3_ENCRYPTION_KEY);
+    String keyId = getS3EncryptionKey(getTestBucketName(getConfiguration()),
+        getConfiguration());
     Assertions.assertThat(processHeader(fsXAttrs,
         xAttrPrefix + Headers.MATERIALS_DESCRIPTION))
         .describedAs("Materials Description should contain the content "
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index eb68eed..dce99a6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -42,7 +42,6 @@ import java.io.IOException;
 import java.io.File;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
 
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -502,81 +501,6 @@ public class ITestS3AConfiguration {
   }
 
   @Test
-  public void testBucketConfigurationPropagation() throws Throwable {
-    Configuration config = new Configuration(false);
-    setBucketOption(config, "b", "base", "1024");
-    String basekey = "fs.s3a.base";
-    assertOptionEquals(config, basekey, null);
-    String bucketKey = "fs.s3a.bucket.b.base";
-    assertOptionEquals(config, bucketKey, "1024");
-    Configuration updated = propagateBucketOptions(config, "b");
-    assertOptionEquals(updated, basekey, "1024");
-    // original conf is not updated
-    assertOptionEquals(config, basekey, null);
-
-    String[] sources = updated.getPropertySources(basekey);
-    assertEquals(1, sources.length);
-    String sourceInfo = sources[0];
-    assertTrue("Wrong source " + sourceInfo, sourceInfo.contains(bucketKey));
-  }
-
-  @Test
-  public void testBucketConfigurationPropagationResolution() throws Throwable {
-    Configuration config = new Configuration(false);
-    String basekey = "fs.s3a.base";
-    String baseref = "fs.s3a.baseref";
-    String baseref2 = "fs.s3a.baseref2";
-    config.set(basekey, "orig");
-    config.set(baseref2, "${fs.s3a.base}");
-    setBucketOption(config, "b", basekey, "1024");
-    setBucketOption(config, "b", baseref, "${fs.s3a.base}");
-    Configuration updated = propagateBucketOptions(config, "b");
-    assertOptionEquals(updated, basekey, "1024");
-    assertOptionEquals(updated, baseref, "1024");
-    assertOptionEquals(updated, baseref2, "1024");
-  }
-
-  @Test
-  public void testMultipleBucketConfigurations() throws Throwable {
-    Configuration config = new Configuration(false);
-    setBucketOption(config, "b", USER_AGENT_PREFIX, "UA-b");
-    setBucketOption(config, "c", USER_AGENT_PREFIX, "UA-c");
-    config.set(USER_AGENT_PREFIX, "UA-orig");
-    Configuration updated = propagateBucketOptions(config, "c");
-    assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
-  }
-
-  @Test
-  public void testClearBucketOption() throws Throwable {
-    Configuration config = new Configuration();
-    config.set(USER_AGENT_PREFIX, "base");
-    setBucketOption(config, "bucket", USER_AGENT_PREFIX, "overridden");
-    clearBucketOption(config, "bucket", USER_AGENT_PREFIX);
-    Configuration updated = propagateBucketOptions(config, "c");
-    assertOptionEquals(updated, USER_AGENT_PREFIX, "base");
-  }
-
-  @Test
-  public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
-    Configuration config = new Configuration(false);
-    String impl = "fs.s3a.impl";
-    config.set(impl, "orig");
-    setBucketOption(config, "b", impl, "b");
-    String metastoreImpl = "fs.s3a.metadatastore.impl";
-    String ddb = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
-    setBucketOption(config, "b", metastoreImpl, ddb);
-    setBucketOption(config, "b", "impl2", "b2");
-    setBucketOption(config, "b", "bucket.b.loop", "b3");
-    assertOptionEquals(config, "fs.s3a.bucket.b.impl", "b");
-
-    Configuration updated = propagateBucketOptions(config, "b");
-    assertOptionEquals(updated, impl, "orig");
-    assertOptionEquals(updated, "fs.s3a.impl2", "b2");
-    assertOptionEquals(updated, metastoreImpl, ddb);
-    assertOptionEquals(updated, "fs.s3a.bucket.b.loop", null);
-  }
-
-  @Test
   public void testConfOptionPropagationToFS() throws Exception {
     Configuration config = new Configuration();
     String testFSName = config.getTrimmed(TEST_FS_S3A_NAME, "");
@@ -587,53 +511,6 @@ public class ITestS3AConfiguration {
     assertOptionEquals(updated, "fs.s3a.propagation", "propagated");
   }
 
-  @Test
-  public void testSecurityCredentialPropagationNoOverride() throws Exception {
-    Configuration config = new Configuration();
-    config.set(CREDENTIAL_PROVIDER_PATH, "base");
-    patchSecurityCredentialProviders(config);
-    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
-        "base");
-  }
-
-  @Test
-  public void testSecurityCredentialPropagationOverrideNoBase()
-      throws Exception {
-    Configuration config = new Configuration();
-    config.unset(CREDENTIAL_PROVIDER_PATH);
-    config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
-    patchSecurityCredentialProviders(config);
-    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
-        "override");
-  }
-
-  @Test
-  public void testSecurityCredentialPropagationOverride() throws Exception {
-    Configuration config = new Configuration();
-    config.set(CREDENTIAL_PROVIDER_PATH, "base");
-    config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
-    patchSecurityCredentialProviders(config);
-    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
-        "override,base");
-    Collection<String> all = config.getStringCollection(
-        CREDENTIAL_PROVIDER_PATH);
-    assertTrue(all.contains("override"));
-    assertTrue(all.contains("base"));
-  }
-
-  @Test
-  public void testSecurityCredentialPropagationEndToEnd() throws Exception {
-    Configuration config = new Configuration();
-    config.set(CREDENTIAL_PROVIDER_PATH, "base");
-    setBucketOption(config, "b", S3A_SECURITY_CREDENTIAL_PROVIDER_PATH,
-        "override");
-    Configuration updated = propagateBucketOptions(config, "b");
-
-    patchSecurityCredentialProviders(updated);
-    assertOptionEquals(updated, CREDENTIAL_PROVIDER_PATH,
-        "override,base");
-  }
-
   @Test(timeout = 10_000L)
   public void testS3SpecificSignerOverride() throws IOException {
     ClientConfiguration clientConfiguration = null;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
index c281ae1..93e8ae6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
@@ -18,11 +18,13 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 
 /**
  * Concrete class that extends {@link AbstractTestS3AEncryption}
@@ -36,9 +38,12 @@ public class ITestS3AEncryptionSSEKMSUserDefinedKey
   protected Configuration createConfiguration() {
     // get the KMS key for this test.
     Configuration c = new Configuration();
-    String kmsKey = c.get(S3_ENCRYPTION_KEY);
+    String kmsKey = S3AUtils.getS3EncryptionKey(getTestBucketName(c), c);
     // skip the test if SSE-KMS or KMS key not set.
-    skipIfEncryptionNotSet(c, getSSEAlgorithm());
+    if (StringUtils.isBlank(kmsKey)) {
+      skip(S3_ENCRYPTION_KEY + " is not set for " +
+          SSE_KMS.getMethod());
+    }
     Configuration conf = super.createConfiguration();
     conf.set(S3_ENCRYPTION_KEY, kmsKey);
     return conf;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
index 0f48825..a0fb762 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
@@ -34,11 +34,13 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
-import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 
 /**
  * Concrete class that extends {@link AbstractTestS3AEncryption}
@@ -60,11 +62,13 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
     skipIfEncryptionNotSet(c, getSSEAlgorithm());
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   protected void patchConfigurationEncryptionSettings(
       final Configuration conf) {
     removeBaseAndBucketOverrides(conf,
-        S3_ENCRYPTION_ALGORITHM);
+        S3_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM);
     conf.set(S3_ENCRYPTION_ALGORITHM,
             getSSEAlgorithm().getMethod());
   }
@@ -89,7 +93,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
   protected void assertEncrypted(Path path) throws IOException {
     S3AFileSystem fs = getFileSystem();
     Configuration c = fs.getConf();
-    String kmsKey = c.getTrimmed(S3_ENCRYPTION_KEY);
+    String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
     EncryptionTestUtils.assertEncrypted(fs, path, SSE_KMS, kmsKey);
   }
 
@@ -145,7 +149,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
       ContractTestUtils.rename(kmsFS, src, targetDir);
       Path renamedFile = new Path(targetDir, src.getName());
       ContractTestUtils.verifyFileContents(fs, renamedFile, data);
-      String kmsKey = fs2Conf.getTrimmed(S3_ENCRYPTION_KEY);
+      String kmsKey = getS3EncryptionKey(getTestBucketName(fs2Conf), fs2Conf);
       // we assert that the renamed file has picked up the KMS key of our FS
       EncryptionTestUtils.assertEncrypted(fs, renamedFile, SSE_KMS, kmsKey);
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 49b7f77..113b68a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
 import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
@@ -61,6 +62,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
+import org.assertj.core.api.Assertions;
 import org.hamcrest.core.Is;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -91,6 +93,8 @@ import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
 import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
 import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.*;
@@ -246,9 +250,10 @@ public final class S3ATestUtils {
    *
    * @param conf Test Configuration.
    */
-  private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf) {
-    String encryptionMethod =
-        conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM, "");
+  private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf)
+      throws IOException {
+    String encryptionMethod = getEncryptionAlgorithm(getTestBucketName(conf),
+        conf).getMethod();
     String metaStore = conf.getTrimmed(S3_METADATA_STORE_IMPL, "");
     if (encryptionMethod.equals(S3AEncryptionMethods.CSE_KMS.getMethod()) &&
         !metaStore.equals(S3GUARD_METASTORE_NULL)) {
@@ -1238,7 +1243,13 @@ public final class S3ATestUtils {
   public static void assertOptionEquals(Configuration conf,
       String key,
       String expected) {
-    assertEquals("Value of " + key, expected, conf.get(key));
+    String actual = conf.get(key);
+    String origin = actual == null
+        ? "(none)"
+        : "[" + StringUtils.join(conf.getPropertySources(key), ", ") + "]";
+    Assertions.assertThat(actual)
+        .describedAs("Value of %s with origin %s", key, origin)
+        .isEqualTo(expected);
   }
 
   /**
@@ -1538,15 +1549,17 @@ public final class S3ATestUtils {
    * @param configuration configuration to probe.
    */
   public static void skipIfEncryptionNotSet(Configuration configuration,
-      S3AEncryptionMethods s3AEncryptionMethod) {
+      S3AEncryptionMethods s3AEncryptionMethod) throws IOException {
     // if S3 encryption algorithm is not set to desired method or AWS encryption
     // key is not set, then skip.
-    if (!configuration.getTrimmed(S3_ENCRYPTION_ALGORITHM, "")
-        .equals(s3AEncryptionMethod.getMethod())
-        || configuration.get(Constants.S3_ENCRYPTION_KEY) == null) {
+    String bucket = getTestBucketName(configuration);
+    final EncryptionSecrets secrets = buildEncryptionSecrets(bucket, configuration);
+    if (!s3AEncryptionMethod.getMethod().equals(secrets.getEncryptionMethod().getMethod())
+        || StringUtils.isBlank(secrets.getEncryptionKey())) {
       skip(S3_ENCRYPTION_KEY + " is not set for " + s3AEncryptionMethod
           .getMethod() + " or " + S3_ENCRYPTION_ALGORITHM + " is not set to "
-          + s3AEncryptionMethod.getMethod());
+          + s3AEncryptionMethod.getMethod()
+          + " in " + secrets);
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBucketConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBucketConfiguration.java
new file mode 100644
index 0000000..0ac49a3
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBucketConfiguration.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Collection;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Constants.S3A_SECURITY_CREDENTIAL_PROVIDER_PATH;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertOptionEquals;
+import static org.apache.hadoop.fs.s3a.S3AUtils.CREDENTIAL_PROVIDER_PATH;
+import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
+import static org.apache.hadoop.fs.s3a.S3AUtils.patchSecurityCredentialProviders;
+import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
+import static org.apache.hadoop.fs.s3a.S3AUtils.setBucketOption;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * S3A tests for configuration option propagation.
+ */
+@SuppressWarnings("deprecation")
+public class TestBucketConfiguration extends AbstractHadoopTestBase {
+
+  private static final String NEW_ALGORITHM_KEY_GLOBAL = "CSE-KMS";
+  private static final String OLD_ALGORITHM_KEY_BUCKET = "SSE-KMS";
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  /**
+   * Setup: create the contract then init it.
+   * @throws Exception on any failure
+   */
+  @Before
+  public void setup() throws Exception {
+    // forces in deprecation wireup, even when this test method is running isolated
+    S3AFileSystem.initializeClass();
+  }
+
+  @Test
+  public void testBucketConfigurationPropagation() throws Throwable {
+    Configuration config = new Configuration(false);
+    setBucketOption(config, "b", "base", "1024");
+    String basekey = "fs.s3a.base";
+    assertOptionEquals(config, basekey, null);
+    String bucketKey = "fs.s3a.bucket.b.base";
+    assertOptionEquals(config, bucketKey, "1024");
+    Configuration updated = propagateBucketOptions(config, "b");
+    assertOptionEquals(updated, basekey, "1024");
+    // original conf is not updated
+    assertOptionEquals(config, basekey, null);
+
+    String[] sources = updated.getPropertySources(basekey);
+    assertEquals(1, sources.length);
+    Assertions.assertThat(sources)
+        .describedAs("base key property sources")
+        .hasSize(1);
+    Assertions.assertThat(sources[0])
+        .describedAs("Property source")
+        .contains(bucketKey);
+  }
+
+  @Test
+  public void testBucketConfigurationPropagationResolution() throws Throwable {
+    Configuration config = new Configuration(false);
+    String basekey = "fs.s3a.base";
+    String baseref = "fs.s3a.baseref";
+    String baseref2 = "fs.s3a.baseref2";
+    config.set(basekey, "orig");
+    config.set(baseref2, "${fs.s3a.base}");
+    setBucketOption(config, "b", basekey, "1024");
+    setBucketOption(config, "b", baseref, "${fs.s3a.base}");
+    Configuration updated = propagateBucketOptions(config, "b");
+    assertOptionEquals(updated, basekey, "1024");
+    assertOptionEquals(updated, baseref, "1024");
+    assertOptionEquals(updated, baseref2, "1024");
+  }
+
+  @Test
+  public void testMultipleBucketConfigurations() throws Throwable {
+    Configuration config = new Configuration(false);
+    setBucketOption(config, "b", USER_AGENT_PREFIX, "UA-b");
+    setBucketOption(config, "c", USER_AGENT_PREFIX, "UA-c");
+    config.set(USER_AGENT_PREFIX, "UA-orig");
+    Configuration updated = propagateBucketOptions(config, "c");
+    assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
+  }
+
+  @Test
+  public void testClearBucketOption() throws Throwable {
+    Configuration config = new Configuration();
+    config.set(USER_AGENT_PREFIX, "base");
+    setBucketOption(config, "bucket", USER_AGENT_PREFIX, "overridden");
+    clearBucketOption(config, "bucket", USER_AGENT_PREFIX);
+    Configuration updated = propagateBucketOptions(config, "c");
+    assertOptionEquals(updated, USER_AGENT_PREFIX, "base");
+  }
+
+  @Test
+  public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
+    Configuration config = new Configuration(false);
+    String impl = "fs.s3a.impl";
+    config.set(impl, "orig");
+    setBucketOption(config, "b", impl, "b");
+    String metastoreImpl = "fs.s3a.metadatastore.impl";
+    String ddb = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
+    setBucketOption(config, "b", metastoreImpl, ddb);
+    setBucketOption(config, "b", "impl2", "b2");
+    setBucketOption(config, "b", "bucket.b.loop", "b3");
+    assertOptionEquals(config, "fs.s3a.bucket.b.impl", "b");
+
+    Configuration updated = propagateBucketOptions(config, "b");
+    assertOptionEquals(updated, impl, "orig");
+    assertOptionEquals(updated, "fs.s3a.impl2", "b2");
+    assertOptionEquals(updated, metastoreImpl, ddb);
+    assertOptionEquals(updated, "fs.s3a.bucket.b.loop", null);
+  }
+
+  @Test
+  public void testSecurityCredentialPropagationNoOverride() throws Exception {
+    Configuration config = new Configuration();
+    config.set(CREDENTIAL_PROVIDER_PATH, "base");
+    patchSecurityCredentialProviders(config);
+    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
+        "base");
+  }
+
+  @Test
+  public void testSecurityCredentialPropagationOverrideNoBase()
+      throws Exception {
+    Configuration config = new Configuration();
+    config.unset(CREDENTIAL_PROVIDER_PATH);
+    config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
+    patchSecurityCredentialProviders(config);
+    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
+        "override");
+  }
+
+  @Test
+  public void testSecurityCredentialPropagationOverride() throws Exception {
+    Configuration config = new Configuration();
+    config.set(CREDENTIAL_PROVIDER_PATH, "base");
+    config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
+    patchSecurityCredentialProviders(config);
+    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
+        "override,base");
+    Collection<String> all = config.getStringCollection(
+        CREDENTIAL_PROVIDER_PATH);
+    assertTrue(all.contains("override"));
+    assertTrue(all.contains("base"));
+  }
+
+  @Test
+  public void testSecurityCredentialPropagationEndToEnd() throws Exception {
+    Configuration config = new Configuration();
+    config.set(CREDENTIAL_PROVIDER_PATH, "base");
+    setBucketOption(config, "b", S3A_SECURITY_CREDENTIAL_PROVIDER_PATH,
+        "override");
+    Configuration updated = propagateBucketOptions(config, "b");
+
+    patchSecurityCredentialProviders(updated);
+    assertOptionEquals(updated, CREDENTIAL_PROVIDER_PATH,
+        "override,base");
+  }
+
+  /**
+   * This test shows that a per-bucket value of the older key takes priority
+   * over a global value of a new key in XML configuration file.
+   */
+  @Test
+  public void testBucketConfigurationDeprecatedEncryptionAlgorithm()
+      throws Throwable {
+    Configuration config = new Configuration(false);
+    config.set(S3_ENCRYPTION_ALGORITHM, NEW_ALGORITHM_KEY_GLOBAL);
+    setBucketOption(config, "b", SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        OLD_ALGORITHM_KEY_BUCKET);
+    Configuration updated = propagateBucketOptions(config, "b");
+
+    // Get the encryption method and verify that the value is per-bucket of
+    // old keys.
+    String value = getEncryptionAlgorithm("b", updated).getMethod();
+    Assertions.assertThat(value)
+        .describedAs("lookupPassword(%s)", S3_ENCRYPTION_ALGORITHM)
+        .isEqualTo(OLD_ALGORITHM_KEY_BUCKET);
+  }
+
+  @Test
+  public void testJceksDeprecatedEncryptionAlgorithm() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration(false);
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(S3_ENCRYPTION_ALGORITHM,
+        NEW_ALGORITHM_KEY_GLOBAL.toCharArray());
+    provider.createCredentialEntry(S3_ENCRYPTION_KEY,
+        "global s3 encryption key".toCharArray());
+    provider.createCredentialEntry(
+        FS_S3A_BUCKET_PREFIX + "b." + SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        OLD_ALGORITHM_KEY_BUCKET.toCharArray());
+    final String bucketKey = "bucket-server-side-encryption-key";
+    provider.createCredentialEntry(
+        FS_S3A_BUCKET_PREFIX + "b." + SERVER_SIDE_ENCRYPTION_KEY,
+        bucketKey.toCharArray());
+    provider.flush();
+
+    // Get the encryption method and verify that the value is per-bucket of
+    // old keys.
+    final EncryptionSecrets secrets = S3AUtils.buildEncryptionSecrets("b", conf);
+    Assertions.assertThat(secrets.getEncryptionMethod().getMethod())
+        .describedAs("buildEncryptionSecrets() encryption algorithm resolved to %s", secrets)
+        .isEqualTo(OLD_ALGORITHM_KEY_BUCKET);
+
+    Assertions.assertThat(secrets.getEncryptionKey())
+        .describedAs("buildEncryptionSecrets() encryption key resolved to %s", secrets)
+        .isEqualTo(bucketKey);
+
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
index b3fc5de..6aed9e7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
@@ -66,8 +66,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.unsetHadoopCredentialProviders;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.*;
 import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenIOException.TOKEN_MISMATCH;
 import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.ALICE;
@@ -146,7 +148,7 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
     String s3EncryptionMethod =
         conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM,
             S3AEncryptionMethods.SSE_KMS.getMethod());
-    String s3EncryptionKey = conf.getTrimmed(Constants.S3_ENCRYPTION_KEY, "");
+    String s3EncryptionKey = getS3EncryptionKey(getTestBucketName(conf), conf);
     removeBaseAndBucketOverrides(conf,
         DELEGATION_TOKEN_BINDING,
         Constants.S3_ENCRYPTION_ALGORITHM,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
index 9325feb..0448f9d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
@@ -20,15 +20,17 @@ package org.apache.hadoop.fs.s3a.scale;
 
 import java.io.IOException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 
-import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 
 /**
  * Class to test SSE_KMS encryption settings for huge files.
@@ -58,13 +60,13 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
   @Override
   protected boolean isEncrypted(S3AFileSystem fileSystem) {
     Configuration c = new Configuration();
-    return c.get(S3_ENCRYPTION_KEY) != null;
+    return StringUtils.isNotBlank(getS3EncryptionKey(getTestBucketName(c), c));
   }
 
   @Override
   protected void assertEncrypted(Path hugeFile) throws IOException {
     Configuration c = new Configuration();
-    String kmsKey = c.get(S3_ENCRYPTION_KEY);
+    String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
     EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile,
             SSE_KMS, kmsKey);
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org