You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/10/05 10:46:25 UTC

[hadoop] branch branch-3.3 updated (da011ba -> 6f7b456)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from da011ba  HADOOP-17947. Provide alternative to Guava VisibleForTesting (#3505)
     new aee975a  HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK (#2706)
     new abb367a  HADOOP-17817.HADOOP-17823. S3A to raise IOE if both S3-CSE and S3Guard enabled (#3239)
     new 769059c  HADOOP-17871. S3A CSE: minor tuning (#3412)
     new 6f7b456  HADOOP-17922. move to fs.s3a.encryption.algorithm - JCEKS integration (#3466)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/fs/CommonConfigurationKeysPublic.java   |   2 +
 .../src/main/resources/core-default.xml            |  24 +-
 .../AbstractContractMultipartUploaderTest.java     |   3 +
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  31 ++-
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      | 149 ++++++++--
 .../java/org/apache/hadoop/fs/s3a/Listing.java     |   9 +-
 .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java |  43 ++-
 .../apache/hadoop/fs/s3a/S3AEncryptionMethods.java |  54 ++--
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  98 +++++--
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java    | 185 ++++++++++--
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |   9 +-
 .../hadoop/fs/s3a/impl/HeaderProcessing.java       |  11 +-
 .../hadoop/fs/s3a/impl/InternalConstants.java      |  13 +
 .../apache/hadoop/fs/s3a/impl/RenameOperation.java |   2 +-
 .../apache/hadoop/fs/s3a/impl/StoreContext.java    |  15 +-
 .../hadoop/fs/s3a/impl/StoreContextBuilder.java    |  16 +-
 .../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java  |   4 +-
 .../site/markdown/tools/hadoop-aws/encryption.md   | 266 ++++++++++++++++--
 .../src/site/markdown/tools/hadoop-aws/index.md    |  81 +++---
 .../src/site/markdown/tools/hadoop-aws/testing.md  |  11 +-
 .../tools/hadoop-aws/troubleshooting_s3a.md        | 303 ++++++++++++++++++++
 .../fs/contract/s3a/ITestS3AContractSeek.java      |   3 +-
 .../apache/hadoop/fs/contract/s3a/S3AContract.java |  19 ++
 .../apache/hadoop/fs/s3a/AbstractS3AMockTest.java  |   2 +
 .../apache/hadoop/fs/s3a/AbstractS3ATestBase.java  |   9 +
 .../hadoop/fs/s3a/AbstractTestS3AEncryption.java   |  16 +-
 .../apache/hadoop/fs/s3a/EncryptionTestUtils.java  |   4 +-
 .../fs/s3a/ITestS3AClientSideEncryption.java       | 310 +++++++++++++++++++++
 .../fs/s3a/ITestS3AClientSideEncryptionKms.java    |  98 +++++++
 .../hadoop/fs/s3a/ITestS3AConfiguration.java       | 123 --------
 .../s3a/ITestS3AEncryptionAlgorithmValidation.java | 142 +++++-----
 .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java      |  19 +-
 .../fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java |   2 +-
 .../ITestS3AEncryptionSSEKMSUserDefinedKey.java    |  12 +-
 .../hadoop/fs/s3a/ITestS3AEncryptionSSES3.java     |   2 +-
 .../ITestS3AEncryptionWithDefaultS3Settings.java   |  21 +-
 .../hadoop/fs/s3a/ITestS3AFSMainOperations.java    |  10 +-
 .../hadoop/fs/s3a/ITestS3AInconsistency.java       |   9 +
 .../hadoop/fs/s3a/ITestS3AMiscOperations.java      |   8 +-
 .../hadoop/fs/s3a/ITestS3GuardListConsistency.java |   6 +-
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  13 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  70 ++++-
 .../hadoop/fs/s3a/TestBucketConfiguration.java     | 261 +++++++++++++++++
 .../hadoop/fs/s3a/TestS3AInputStreamRetry.java     |   2 +-
 .../apache/hadoop/fs/s3a/TestSSEConfiguration.java |  17 +-
 .../hadoop/fs/s3a/auth/ITestCustomSigner.java      |  34 ++-
 .../ITestSessionDelegationInFileystem.java         |  32 ++-
 .../hadoop/fs/s3a/commit/AbstractCommitITest.java  |   9 +-
 .../ITestS3AFileContextCreateMkdir.java            |   6 +
 .../fileContext/ITestS3AFileContextStatistics.java |  33 ++-
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java    |   3 +-
 .../fs/s3a/scale/ITestS3AHugeFilesEncryption.java  |  19 +-
 .../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java |  12 +-
 .../s3a/scale/ITestS3AInputStreamPerformance.java  |   5 +
 54 files changed, 2214 insertions(+), 446 deletions(-)
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBucketConfiguration.java

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/04: HADOOP-17922. move to fs.s3a.encryption.algorithm - JCEKS integration (#3466)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 6f7b45641aba6d8ab4e7d7961fdc785c266a27e7
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Thu Sep 30 10:38:53 2021 +0100

    HADOOP-17922. move to fs.s3a.encryption.algorithm - JCEKS integration (#3466)
    
    The ordering of the resolution of new and deprecated s3a encryption options
    & secrets is the same when JCEKS and other hadoop credentials stores are used
    to store them as when they are in XML files: per-bucket settings always take
    priority over global values, even when the bucket-level options use the
    old option names.
    
    Contributed by Mehakmeet Singh and Steve Loughran
    
    Change-Id: I871672071efa2eb6b600cb2658fceeef57f658a3
---
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |  27 ++-
 .../apache/hadoop/fs/s3a/S3AEncryptionMethods.java |  54 +++--
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  10 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java    | 145 ++++++++++--
 .../src/site/markdown/tools/hadoop-aws/index.md    |  53 +++--
 .../hadoop/fs/s3a/AbstractTestS3AEncryption.java   |   7 +-
 .../fs/s3a/ITestS3AClientSideEncryption.java       |  15 +-
 .../fs/s3a/ITestS3AClientSideEncryptionKms.java    |   8 +-
 .../hadoop/fs/s3a/ITestS3AConfiguration.java       | 123 ----------
 .../ITestS3AEncryptionSSEKMSUserDefinedKey.java    |  11 +-
 .../ITestS3AEncryptionWithDefaultS3Settings.java   |  12 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  31 ++-
 .../hadoop/fs/s3a/TestBucketConfiguration.java     | 261 +++++++++++++++++++++
 .../ITestSessionDelegationInFileystem.java         |   4 +-
 .../fs/s3a/scale/ITestS3AHugeFilesEncryption.java  |   8 +-
 15 files changed, 553 insertions(+), 216 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 441ae70..b9e51ef 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -58,8 +58,9 @@ import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
-import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
 
 /**
@@ -96,6 +97,9 @@ public class DefaultS3ClientFactory extends Configured
   /** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
   private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
 
+  /** Bucket name. */
+  private String bucket;
+
   /**
    * Create the client by preparing the AwsConf configuration
    * and then invoking {@code buildAmazonS3Client()}.
@@ -105,9 +109,10 @@ public class DefaultS3ClientFactory extends Configured
       final URI uri,
       final S3ClientCreationParameters parameters) throws IOException {
     Configuration conf = getConf();
+    bucket = uri.getHost();
     final ClientConfiguration awsConf = S3AUtils
         .createAwsConf(conf,
-            uri.getHost(),
+            bucket,
             Constants.AWS_SERVICE_IDENTIFIER_S3);
     // add any headers
     parameters.getHeaders().forEach((h, v) ->
@@ -126,10 +131,13 @@ public class DefaultS3ClientFactory extends Configured
       awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
     }
 
+    // Get the encryption method for this bucket.
+    S3AEncryptionMethods encryptionMethods =
+        getEncryptionAlgorithm(bucket, conf);
     try {
-      if (S3AEncryptionMethods.getMethod(S3AUtils.
-          lookupPassword(conf, S3_ENCRYPTION_ALGORITHM, null))
-          .equals(S3AEncryptionMethods.CSE_KMS)) {
+      // If CSE is enabled then build a S3EncryptionClient.
+      if (S3AEncryptionMethods.CSE_KMS.getMethod()
+          .equals(encryptionMethods.getMethod())) {
         return buildAmazonS3EncryptionClient(
             awsConf,
             parameters);
@@ -163,12 +171,11 @@ public class DefaultS3ClientFactory extends Configured
         new AmazonS3EncryptionClientV2Builder();
     Configuration conf = getConf();
 
-    //CSE-KMS Method
-    String kmsKeyId = S3AUtils.lookupPassword(conf,
-        S3_ENCRYPTION_KEY, null);
+    // CSE-KMS Method
+    String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
     // Check if kmsKeyID is not null
-    Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
-        + "requires KMS key ID. Use " + S3_ENCRYPTION_KEY
+    Preconditions.checkArgument(!StringUtils.isBlank(kmsKeyId), "CSE-KMS "
+        + "method requires KMS key ID. Use " + S3_ENCRYPTION_KEY
         + " property to set it. ");
 
     EncryptionMaterialsProvider materialsProvider =
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
index 85a00b11b..b599790 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
@@ -25,31 +25,45 @@ import org.apache.commons.lang3.StringUtils;
 /**
  * This enum is to centralize the encryption methods and
  * the value required in the configuration.
- *
- * There's two enum values for the two client encryption mechanisms the AWS
- * S3 SDK supports, even though these are not currently supported in S3A.
- * This is to aid supporting CSE in some form in future, fundamental
- * issues about file length of encrypted data notwithstanding.
- *
  */
 public enum S3AEncryptionMethods {
 
-  NONE("", false),
-  SSE_S3("AES256", true),
-  SSE_KMS("SSE-KMS", true),
-  SSE_C("SSE-C", true),
-  CSE_KMS("CSE-KMS", false),
-  CSE_CUSTOM("CSE-CUSTOM", false);
+  NONE("", false, false),
+  SSE_S3("AES256", true, false),
+  SSE_KMS("SSE-KMS", true, false),
+  SSE_C("SSE-C", true, true),
+  CSE_KMS("CSE-KMS", false, true),
+  CSE_CUSTOM("CSE-CUSTOM", false, true);
 
+  /**
+   * Error string when {@link #getMethod(String)} fails.
+   * Used in tests.
+   */
   static final String UNKNOWN_ALGORITHM
       = "Unknown encryption algorithm ";
 
-  private String method;
-  private boolean serverSide;
+  /**
+   * What is the encryption method?
+   */
+  private final String method;
+
+  /**
+   * Is this server side?
+   */
+  private final boolean serverSide;
+
+  /**
+   * Does the encryption method require a
+   * secret in the encryption.key property?
+   */
+  private final boolean requiresSecret;
 
-  S3AEncryptionMethods(String method, final boolean serverSide) {
+  S3AEncryptionMethods(String method,
+      final boolean serverSide,
+      final boolean requiresSecret) {
     this.method = method;
     this.serverSide = serverSide;
+    this.requiresSecret = requiresSecret;
   }
 
   public String getMethod() {
@@ -65,6 +79,14 @@ public enum S3AEncryptionMethods {
   }
 
   /**
+   * Does this encryption algorithm require a secret?
+   * @return true if a secret must be retrieved.
+   */
+  public boolean requiresSecret() {
+    return requiresSecret;
+  }
+
+  /**
    * Get the encryption mechanism from the value provided.
    * @param name algorithm name
    * @return the method
@@ -75,7 +97,7 @@ public enum S3AEncryptionMethods {
       return NONE;
     }
     for (S3AEncryptionMethods v : values()) {
-      if (v.getMethod().equals(name)) {
+      if (v.getMethod().equalsIgnoreCase(name)) {
         return v;
       }
     }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index a6b9862..3b4940d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -433,17 +433,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
       // look for encryption data
       // DT Bindings may override this
-      setEncryptionSecrets(new EncryptionSecrets(
-          getEncryptionAlgorithm(bucket, conf),
-          getS3EncryptionKey(bucket, getConf())));
+      setEncryptionSecrets(
+          buildEncryptionSecrets(bucket, conf));
 
       invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
       instrumentation = new S3AInstrumentation(uri);
       initializeStatisticsBinding();
       // If CSE-KMS method is set then CSE is enabled.
-      isCSEEnabled = S3AUtils.lookupPassword(conf,
-          Constants.S3_ENCRYPTION_ALGORITHM, "")
-          .equals(S3AEncryptionMethods.CSE_KMS.getMethod());
+      isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
+          .equals(getS3EncryptionAlgorithm().getMethod());
       LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
       setCSEGauge();
       // Username is the current user at the time the FS was instantiated.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index b5dc56f..8a82417 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
 import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
@@ -64,6 +65,7 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.io.UncheckedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -1568,22 +1570,101 @@ public final class S3AUtils {
   }
 
   /**
-   * Get any SSE/CSE key from a configuration/credential provider.
-   * This operation handles the case where the option has been
-   * set in the provider or configuration to the option
-   * {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
-   * IOExceptions raised during retrieval are swallowed.
+   * Lookup a per-bucket-secret from a configuration including JCEKS files.
+   * No attempt is made to look for the global configuration.
+   * @param bucket bucket or "" if none known
+   * @param conf configuration
+   * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
+   * @return the secret or null.
+   * @throws IOException on any IO problem
+   * @throws IllegalArgumentException bad arguments
+   */
+  private static String lookupBucketSecret(
+      String bucket,
+      Configuration conf,
+      String baseKey)
+      throws IOException {
+
+    Preconditions.checkArgument(!isEmpty(bucket), "null/empty bucket argument");
+    Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
+        "%s does not start with $%s", baseKey, FS_S3A_PREFIX);
+    String subkey = baseKey.substring(FS_S3A_PREFIX.length());
+
+    // set from the long key unless overidden.
+    String longBucketKey = String.format(
+        BUCKET_PATTERN, bucket, baseKey);
+    String initialVal = getPassword(conf, longBucketKey, null, null);
+    // then override from the short one if it is set
+    String shortBucketKey = String.format(
+        BUCKET_PATTERN, bucket, subkey);
+    return getPassword(conf, shortBucketKey, initialVal, null);
+  }
+
+  /**
+   * Get any S3 encryption key, without propagating exceptions from
+   * JCEKs files.
    * @param bucket bucket to query for
    * @param conf configuration to examine
    * @return the encryption key or ""
    * @throws IllegalArgumentException bad arguments.
    */
-  public static String getS3EncryptionKey(String bucket,
+  public static String getS3EncryptionKey(
+      String bucket,
       Configuration conf) {
     try {
-      return lookupPassword(bucket, conf, Constants.S3_ENCRYPTION_KEY);
+      return getS3EncryptionKey(bucket, conf, false);
     } catch (IOException e) {
-      LOG.error("Cannot retrieve " + Constants.S3_ENCRYPTION_KEY, e);
+      // never going to happen, but to make sure, covert to
+      // runtime exception
+      throw new UncheckedIOException(e);
+    }
+  }
+
+    /**
+     * Get any SSE/CSE key from a configuration/credential provider.
+     * This operation handles the case where the option has been
+     * set in the provider or configuration to the option
+     * {@code SERVER_SIDE_ENCRYPTION_KEY}.
+     * IOExceptions raised during retrieval are swallowed.
+     * @param bucket bucket to query for
+     * @param conf configuration to examine
+     * @param propagateExceptions should IO exceptions be rethrown?
+     * @return the encryption key or ""
+     * @throws IllegalArgumentException bad arguments.
+     * @throws IOException if propagateExceptions==true and reading a JCEKS file raised an IOE
+     */
+  @SuppressWarnings("deprecation")
+  public static String getS3EncryptionKey(
+      String bucket,
+      Configuration conf,
+      boolean propagateExceptions) throws IOException {
+    try {
+      // look up the per-bucket value of the new key,
+      // which implicitly includes the deprecation remapping
+      String key = lookupBucketSecret(bucket, conf, S3_ENCRYPTION_KEY);
+      if (key == null) {
+        // old key in bucket, jceks
+        key = lookupBucketSecret(bucket, conf, SERVER_SIDE_ENCRYPTION_KEY);
+      }
+      if (key == null) {
+        // new key, global; implicit translation of old key in XML files.
+        key = lookupPassword(null, conf, S3_ENCRYPTION_KEY);
+      }
+      if (key == null) {
+        // old key, JCEKS
+        key = lookupPassword(null, conf, SERVER_SIDE_ENCRYPTION_KEY);
+      }
+      if (key == null) {
+        // no key, return ""
+        key = "";
+      }
+      return key;
+    } catch (IOException e) {
+      if (propagateExceptions) {
+        throw e;
+      }
+      LOG.warn("Cannot retrieve {} for bucket {}",
+          S3_ENCRYPTION_KEY, bucket, e);
       return "";
     }
   }
@@ -1597,14 +1678,50 @@ public final class S3AUtils {
    * @param conf configuration to scan
    * @return the encryption mechanism (which will be {@code NONE} unless
    * one is set.
-   * @throws IOException on any validation problem.
+   * @throws IOException on JCKES lookup or invalid method/key configuration.
    */
   public static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
       Configuration conf) throws IOException {
-    S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(
-        lookupPassword(bucket, conf,
-            Constants.S3_ENCRYPTION_ALGORITHM));
-    String encryptionKey = getS3EncryptionKey(bucket, conf);
+    return buildEncryptionSecrets(bucket, conf).getEncryptionMethod();
+  }
+
+  /**
+   * Get the server-side encryption or client side encryption algorithm.
+   * This includes validation of the configuration, checking the state of
+   * the encryption key given the chosen algorithm.
+   *
+   * @param bucket bucket to query for
+   * @param conf configuration to scan
+   * @return the encryption mechanism (which will be {@code NONE} unless
+   * one is set and secrets.
+   * @throws IOException on JCKES lookup or invalid method/key configuration.
+   */
+  @SuppressWarnings("deprecation")
+  public static EncryptionSecrets buildEncryptionSecrets(String bucket,
+      Configuration conf) throws IOException {
+
+    // new key, per-bucket
+    // this will include fixup of the old key in config XML entries
+    String algorithm = lookupBucketSecret(bucket, conf, S3_ENCRYPTION_ALGORITHM);
+    if (algorithm == null) {
+      // try the old key, per-bucket setting, which will find JCEKS values
+      algorithm = lookupBucketSecret(bucket, conf, SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    }
+    if (algorithm == null) {
+      // new key, global setting
+      // this will include fixup of the old key in config XML entries
+      algorithm = lookupPassword(null, conf, S3_ENCRYPTION_ALGORITHM);
+    }
+    if (algorithm == null) {
+      // old key, global setting, for JCEKS entries.
+      algorithm = lookupPassword(null, conf, SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    }
+    // now determine the algorithm
+    final S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(algorithm);
+
+    // look up the encryption key
+    String encryptionKey = getS3EncryptionKey(bucket, conf,
+        encryptionMethod.requiresSecret());
     int encryptionKeyLen =
         StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
     String diagnostics = passwordDiagnostics(encryptionKey, "key");
@@ -1638,7 +1755,7 @@ public final class S3AUtils {
       LOG.debug("Data is unencrypted");
       break;
     }
-    return encryptionMethod;
+    return new EncryptionSecrets(encryptionMethod, encryptionKey);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index f4f7144..3d08a1a 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1426,32 +1426,47 @@ Finally, the public `s3a://landsat-pds/` bucket can be accessed anonymously:
 </property>
 ```
 
-### Customizing S3A secrets held in credential files
+#### per-bucket configuration and deprecated configuration options
 
+Per-bucket declaration of the deprecated encryption options
+will take priority over a global option -even when the
+global option uses the newer configuration keys.
 
-Secrets in JCEKS files or provided by other Hadoop credential providers
-can also be configured on a per bucket basis. The S3A client will
-look for the per-bucket secrets be
+This means that when setting encryption options in XML files,
+the option, `fs.bucket.BUCKET.fs.s3a.server-side-encryption-algorithm`
+will take priority over the global value of `fs.bucket.s3a.encryption.algorithm`.
+The same holds for the encryption key option `fs.s3a.encryption.key`
+and its predecessor `fs.s3a.server-side-encryption.key`.
 
 
-Consider a JCEKS file with six keys:
+For a site configuration of:
 
-```
-fs.s3a.access.key
-fs.s3a.secret.key
-fs.s3a.encryption.algorithm
-fs.s3a.encryption.key
-fs.s3a.bucket.nightly.access.key
-fs.s3a.bucket.nightly.secret.key
-fs.s3a.bucket.nightly.session.token
-fs.s3a.bucket.nightly.server-side-encryption.key
-fs.s3a.bucket.nightly.server-side-encryption-algorithm
-```
+```xml
+<property>
+  <name>fs.s3a.bucket.nightly.server-side-encryption-algorithm</name>
+  <value>SSE-KMS</value>
+</property>
 
-When accessing the bucket `s3a://nightly/`, the per-bucket configuration
-options for that bucket will be used, here the access keys and token,
-and including the encryption algorithm and key.
+<property>
+  <name>fs.s3a.bucket.nightly.server-side-encryption.key</name>
+  <value>arn:aws:kms:eu-west-2:1528130000000:key/753778e4-2d0f-42e6-b894-6a3ae4ea4e5f</value>
+</property>
+
+<property>
+  <name>fs.s3a.encryption.algorithm</name>
+  <value>AES256</value>
+</property>
+
+<property>
+  <name>fs.s3a.encryption.key</name>
+  <value>unset</value>
+</property>
+
+
+```
 
+The bucket "nightly" will be encrypted with SSE-KMS using the KMS key
+`arn:aws:kms:eu-west-2:1528130000000:key/753778e4-2d0f-42e6-b894-6a3ae4ea4e5f`
 
 ###  <a name="per_bucket_endpoints"></a>Using Per-Bucket Configuration to access data round the world
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
index 8e3208c..7945c82 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
@@ -32,9 +32,11 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
 import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 
 /**
  * Test whether or not encryption works by turning it on. Some checks
@@ -163,8 +165,9 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
    */
   protected void assertEncrypted(Path path) throws IOException {
     //S3 will return full arn of the key, so specify global arn in properties
-    String kmsKeyArn = this.getConfiguration().
-        getTrimmed(S3_ENCRYPTION_KEY);
+    String kmsKeyArn =
+        getS3EncryptionKey(getTestBucketName(getConfiguration()),
+            getConfiguration());
     S3AEncryptionMethods algorithm = getSSEAlgorithm();
     EncryptionTestUtils.assertEncrypted(getFileSystem(),
             path,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
index bb052ed..4094b22 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
@@ -45,8 +45,12 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -193,6 +197,7 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
    * Testing how unencrypted and encrypted data behaves when read through
    * CSE enabled and disabled FS respectively.
    */
+  @SuppressWarnings("deprecation")
   @Test
   public void testEncryptionEnabledAndDisabledFS() throws Exception {
     maybeSkipTest();
@@ -203,8 +208,12 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
     Path encryptedFilePath = path(getMethodName() + "cse");
 
     // Initialize a CSE disabled FS.
-    cseDisabledConf.unset(S3_ENCRYPTION_ALGORITHM);
-    cseDisabledConf.unset(S3_ENCRYPTION_KEY);
+    removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+        cseDisabledConf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_KEY);
     cseDisabledFS.initialize(getFileSystem().getUri(),
         cseDisabledConf);
 
@@ -288,7 +297,7 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
   /**
    * Skip tests if certain conditions are met.
    */
-  protected abstract void maybeSkipTest();
+  protected abstract void maybeSkipTest() throws IOException;
 
   /**
    * Assert that at path references an encrypted blob.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
index 085c0f9..bcc37c8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 
 /**
  * Testing the S3 CSE - KMS method.
@@ -53,7 +55,7 @@ public class ITestS3AClientSideEncryptionKms
   }
 
   @Override
-  protected void maybeSkipTest() {
+  protected void maybeSkipTest() throws IOException {
     skipIfEncryptionTestsDisabled(getConfiguration());
     // skip the test if CSE-KMS or KMS key is not set.
     skipIfEncryptionNotSet(getConfiguration(), S3AEncryptionMethods.CSE_KMS);
@@ -71,8 +73,8 @@ public class ITestS3AClientSideEncryptionKms
 
     // Assert content encryption algo for KMS, is present in the
     // materials description and KMS key ID isn't.
-    String keyId =
-        getConfiguration().get(Constants.S3_ENCRYPTION_KEY);
+    String keyId = getS3EncryptionKey(getTestBucketName(getConfiguration()),
+        getConfiguration());
     Assertions.assertThat(processHeader(fsXAttrs,
         xAttrPrefix + Headers.MATERIALS_DESCRIPTION))
         .describedAs("Materials Description should contain the content "
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index eb68eed..dce99a6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -42,7 +42,6 @@ import java.io.IOException;
 import java.io.File;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
 
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -502,81 +501,6 @@ public class ITestS3AConfiguration {
   }
 
   @Test
-  public void testBucketConfigurationPropagation() throws Throwable {
-    Configuration config = new Configuration(false);
-    setBucketOption(config, "b", "base", "1024");
-    String basekey = "fs.s3a.base";
-    assertOptionEquals(config, basekey, null);
-    String bucketKey = "fs.s3a.bucket.b.base";
-    assertOptionEquals(config, bucketKey, "1024");
-    Configuration updated = propagateBucketOptions(config, "b");
-    assertOptionEquals(updated, basekey, "1024");
-    // original conf is not updated
-    assertOptionEquals(config, basekey, null);
-
-    String[] sources = updated.getPropertySources(basekey);
-    assertEquals(1, sources.length);
-    String sourceInfo = sources[0];
-    assertTrue("Wrong source " + sourceInfo, sourceInfo.contains(bucketKey));
-  }
-
-  @Test
-  public void testBucketConfigurationPropagationResolution() throws Throwable {
-    Configuration config = new Configuration(false);
-    String basekey = "fs.s3a.base";
-    String baseref = "fs.s3a.baseref";
-    String baseref2 = "fs.s3a.baseref2";
-    config.set(basekey, "orig");
-    config.set(baseref2, "${fs.s3a.base}");
-    setBucketOption(config, "b", basekey, "1024");
-    setBucketOption(config, "b", baseref, "${fs.s3a.base}");
-    Configuration updated = propagateBucketOptions(config, "b");
-    assertOptionEquals(updated, basekey, "1024");
-    assertOptionEquals(updated, baseref, "1024");
-    assertOptionEquals(updated, baseref2, "1024");
-  }
-
-  @Test
-  public void testMultipleBucketConfigurations() throws Throwable {
-    Configuration config = new Configuration(false);
-    setBucketOption(config, "b", USER_AGENT_PREFIX, "UA-b");
-    setBucketOption(config, "c", USER_AGENT_PREFIX, "UA-c");
-    config.set(USER_AGENT_PREFIX, "UA-orig");
-    Configuration updated = propagateBucketOptions(config, "c");
-    assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
-  }
-
-  @Test
-  public void testClearBucketOption() throws Throwable {
-    Configuration config = new Configuration();
-    config.set(USER_AGENT_PREFIX, "base");
-    setBucketOption(config, "bucket", USER_AGENT_PREFIX, "overridden");
-    clearBucketOption(config, "bucket", USER_AGENT_PREFIX);
-    Configuration updated = propagateBucketOptions(config, "c");
-    assertOptionEquals(updated, USER_AGENT_PREFIX, "base");
-  }
-
-  @Test
-  public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
-    Configuration config = new Configuration(false);
-    String impl = "fs.s3a.impl";
-    config.set(impl, "orig");
-    setBucketOption(config, "b", impl, "b");
-    String metastoreImpl = "fs.s3a.metadatastore.impl";
-    String ddb = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
-    setBucketOption(config, "b", metastoreImpl, ddb);
-    setBucketOption(config, "b", "impl2", "b2");
-    setBucketOption(config, "b", "bucket.b.loop", "b3");
-    assertOptionEquals(config, "fs.s3a.bucket.b.impl", "b");
-
-    Configuration updated = propagateBucketOptions(config, "b");
-    assertOptionEquals(updated, impl, "orig");
-    assertOptionEquals(updated, "fs.s3a.impl2", "b2");
-    assertOptionEquals(updated, metastoreImpl, ddb);
-    assertOptionEquals(updated, "fs.s3a.bucket.b.loop", null);
-  }
-
-  @Test
   public void testConfOptionPropagationToFS() throws Exception {
     Configuration config = new Configuration();
     String testFSName = config.getTrimmed(TEST_FS_S3A_NAME, "");
@@ -587,53 +511,6 @@ public class ITestS3AConfiguration {
     assertOptionEquals(updated, "fs.s3a.propagation", "propagated");
   }
 
-  @Test
-  public void testSecurityCredentialPropagationNoOverride() throws Exception {
-    Configuration config = new Configuration();
-    config.set(CREDENTIAL_PROVIDER_PATH, "base");
-    patchSecurityCredentialProviders(config);
-    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
-        "base");
-  }
-
-  @Test
-  public void testSecurityCredentialPropagationOverrideNoBase()
-      throws Exception {
-    Configuration config = new Configuration();
-    config.unset(CREDENTIAL_PROVIDER_PATH);
-    config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
-    patchSecurityCredentialProviders(config);
-    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
-        "override");
-  }
-
-  @Test
-  public void testSecurityCredentialPropagationOverride() throws Exception {
-    Configuration config = new Configuration();
-    config.set(CREDENTIAL_PROVIDER_PATH, "base");
-    config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
-    patchSecurityCredentialProviders(config);
-    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
-        "override,base");
-    Collection<String> all = config.getStringCollection(
-        CREDENTIAL_PROVIDER_PATH);
-    assertTrue(all.contains("override"));
-    assertTrue(all.contains("base"));
-  }
-
-  @Test
-  public void testSecurityCredentialPropagationEndToEnd() throws Exception {
-    Configuration config = new Configuration();
-    config.set(CREDENTIAL_PROVIDER_PATH, "base");
-    setBucketOption(config, "b", S3A_SECURITY_CREDENTIAL_PROVIDER_PATH,
-        "override");
-    Configuration updated = propagateBucketOptions(config, "b");
-
-    patchSecurityCredentialProviders(updated);
-    assertOptionEquals(updated, CREDENTIAL_PROVIDER_PATH,
-        "override,base");
-  }
-
   @Test(timeout = 10_000L)
   public void testS3SpecificSignerOverride() throws IOException {
     ClientConfiguration clientConfiguration = null;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
index c281ae1..93e8ae6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
@@ -18,11 +18,13 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 
 /**
  * Concrete class that extends {@link AbstractTestS3AEncryption}
@@ -36,9 +38,12 @@ public class ITestS3AEncryptionSSEKMSUserDefinedKey
   protected Configuration createConfiguration() {
     // get the KMS key for this test.
     Configuration c = new Configuration();
-    String kmsKey = c.get(S3_ENCRYPTION_KEY);
+    String kmsKey = S3AUtils.getS3EncryptionKey(getTestBucketName(c), c);
     // skip the test if SSE-KMS or KMS key not set.
-    skipIfEncryptionNotSet(c, getSSEAlgorithm());
+    if (StringUtils.isBlank(kmsKey)) {
+      skip(S3_ENCRYPTION_KEY + " is not set for " +
+          SSE_KMS.getMethod());
+    }
     Configuration conf = super.createConfiguration();
     conf.set(S3_ENCRYPTION_KEY, kmsKey);
     return conf;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
index 0f48825..a0fb762 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
@@ -34,11 +34,13 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
-import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 
 /**
  * Concrete class that extends {@link AbstractTestS3AEncryption}
@@ -60,11 +62,13 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
     skipIfEncryptionNotSet(c, getSSEAlgorithm());
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   protected void patchConfigurationEncryptionSettings(
       final Configuration conf) {
     removeBaseAndBucketOverrides(conf,
-        S3_ENCRYPTION_ALGORITHM);
+        S3_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM);
     conf.set(S3_ENCRYPTION_ALGORITHM,
             getSSEAlgorithm().getMethod());
   }
@@ -89,7 +93,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
   protected void assertEncrypted(Path path) throws IOException {
     S3AFileSystem fs = getFileSystem();
     Configuration c = fs.getConf();
-    String kmsKey = c.getTrimmed(S3_ENCRYPTION_KEY);
+    String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
     EncryptionTestUtils.assertEncrypted(fs, path, SSE_KMS, kmsKey);
   }
 
@@ -145,7 +149,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
       ContractTestUtils.rename(kmsFS, src, targetDir);
       Path renamedFile = new Path(targetDir, src.getName());
       ContractTestUtils.verifyFileContents(fs, renamedFile, data);
-      String kmsKey = fs2Conf.getTrimmed(S3_ENCRYPTION_KEY);
+      String kmsKey = getS3EncryptionKey(getTestBucketName(fs2Conf), fs2Conf);
       // we assert that the renamed file has picked up the KMS key of our FS
       EncryptionTestUtils.assertEncrypted(fs, renamedFile, SSE_KMS, kmsKey);
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 49b7f77..113b68a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
 import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
@@ -61,6 +62,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
+import org.assertj.core.api.Assertions;
 import org.hamcrest.core.Is;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -91,6 +93,8 @@ import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
 import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
 import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.*;
@@ -246,9 +250,10 @@ public final class S3ATestUtils {
    *
    * @param conf Test Configuration.
    */
-  private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf) {
-    String encryptionMethod =
-        conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM, "");
+  private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf)
+      throws IOException {
+    String encryptionMethod = getEncryptionAlgorithm(getTestBucketName(conf),
+        conf).getMethod();
     String metaStore = conf.getTrimmed(S3_METADATA_STORE_IMPL, "");
     if (encryptionMethod.equals(S3AEncryptionMethods.CSE_KMS.getMethod()) &&
         !metaStore.equals(S3GUARD_METASTORE_NULL)) {
@@ -1238,7 +1243,13 @@ public final class S3ATestUtils {
   public static void assertOptionEquals(Configuration conf,
       String key,
       String expected) {
-    assertEquals("Value of " + key, expected, conf.get(key));
+    String actual = conf.get(key);
+    String origin = actual == null
+        ? "(none)"
+        : "[" + StringUtils.join(conf.getPropertySources(key), ", ") + "]";
+    Assertions.assertThat(actual)
+        .describedAs("Value of %s with origin %s", key, origin)
+        .isEqualTo(expected);
   }
 
   /**
@@ -1538,15 +1549,17 @@ public final class S3ATestUtils {
    * @param configuration configuration to probe.
    */
   public static void skipIfEncryptionNotSet(Configuration configuration,
-      S3AEncryptionMethods s3AEncryptionMethod) {
+      S3AEncryptionMethods s3AEncryptionMethod) throws IOException {
     // if S3 encryption algorithm is not set to desired method or AWS encryption
     // key is not set, then skip.
-    if (!configuration.getTrimmed(S3_ENCRYPTION_ALGORITHM, "")
-        .equals(s3AEncryptionMethod.getMethod())
-        || configuration.get(Constants.S3_ENCRYPTION_KEY) == null) {
+    String bucket = getTestBucketName(configuration);
+    final EncryptionSecrets secrets = buildEncryptionSecrets(bucket, configuration);
+    if (!s3AEncryptionMethod.getMethod().equals(secrets.getEncryptionMethod().getMethod())
+        || StringUtils.isBlank(secrets.getEncryptionKey())) {
       skip(S3_ENCRYPTION_KEY + " is not set for " + s3AEncryptionMethod
           .getMethod() + " or " + S3_ENCRYPTION_ALGORITHM + " is not set to "
-          + s3AEncryptionMethod.getMethod());
+          + s3AEncryptionMethod.getMethod()
+          + " in " + secrets);
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBucketConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBucketConfiguration.java
new file mode 100644
index 0000000..0ac49a3
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBucketConfiguration.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Collection;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Constants.S3A_SECURITY_CREDENTIAL_PROVIDER_PATH;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertOptionEquals;
+import static org.apache.hadoop.fs.s3a.S3AUtils.CREDENTIAL_PROVIDER_PATH;
+import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
+import static org.apache.hadoop.fs.s3a.S3AUtils.patchSecurityCredentialProviders;
+import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
+import static org.apache.hadoop.fs.s3a.S3AUtils.setBucketOption;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * S3A tests for configuration option propagation.
+ */
+@SuppressWarnings("deprecation")
+public class TestBucketConfiguration extends AbstractHadoopTestBase {
+
+  private static final String NEW_ALGORITHM_KEY_GLOBAL = "CSE-KMS";
+  private static final String OLD_ALGORITHM_KEY_BUCKET = "SSE-KMS";
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  /**
+   * Setup: create the contract then init it.
+   * @throws Exception on any failure
+   */
+  @Before
+  public void setup() throws Exception {
+    // forces in deprecation wireup, even when this test method is running isolated
+    S3AFileSystem.initializeClass();
+  }
+
+  @Test
+  public void testBucketConfigurationPropagation() throws Throwable {
+    Configuration config = new Configuration(false);
+    setBucketOption(config, "b", "base", "1024");
+    String basekey = "fs.s3a.base";
+    assertOptionEquals(config, basekey, null);
+    String bucketKey = "fs.s3a.bucket.b.base";
+    assertOptionEquals(config, bucketKey, "1024");
+    Configuration updated = propagateBucketOptions(config, "b");
+    assertOptionEquals(updated, basekey, "1024");
+    // original conf is not updated
+    assertOptionEquals(config, basekey, null);
+
+    String[] sources = updated.getPropertySources(basekey);
+    assertEquals(1, sources.length);
+    Assertions.assertThat(sources)
+        .describedAs("base key property sources")
+        .hasSize(1);
+    Assertions.assertThat(sources[0])
+        .describedAs("Property source")
+        .contains(bucketKey);
+  }
+
+  @Test
+  public void testBucketConfigurationPropagationResolution() throws Throwable {
+    Configuration config = new Configuration(false);
+    String basekey = "fs.s3a.base";
+    String baseref = "fs.s3a.baseref";
+    String baseref2 = "fs.s3a.baseref2";
+    config.set(basekey, "orig");
+    config.set(baseref2, "${fs.s3a.base}");
+    setBucketOption(config, "b", basekey, "1024");
+    setBucketOption(config, "b", baseref, "${fs.s3a.base}");
+    Configuration updated = propagateBucketOptions(config, "b");
+    assertOptionEquals(updated, basekey, "1024");
+    assertOptionEquals(updated, baseref, "1024");
+    assertOptionEquals(updated, baseref2, "1024");
+  }
+
+  @Test
+  public void testMultipleBucketConfigurations() throws Throwable {
+    Configuration config = new Configuration(false);
+    setBucketOption(config, "b", USER_AGENT_PREFIX, "UA-b");
+    setBucketOption(config, "c", USER_AGENT_PREFIX, "UA-c");
+    config.set(USER_AGENT_PREFIX, "UA-orig");
+    Configuration updated = propagateBucketOptions(config, "c");
+    assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
+  }
+
+  @Test
+  public void testClearBucketOption() throws Throwable {
+    Configuration config = new Configuration();
+    config.set(USER_AGENT_PREFIX, "base");
+    setBucketOption(config, "bucket", USER_AGENT_PREFIX, "overridden");
+    clearBucketOption(config, "bucket", USER_AGENT_PREFIX);
+    Configuration updated = propagateBucketOptions(config, "c");
+    assertOptionEquals(updated, USER_AGENT_PREFIX, "base");
+  }
+
+  @Test
+  public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
+    Configuration config = new Configuration(false);
+    String impl = "fs.s3a.impl";
+    config.set(impl, "orig");
+    setBucketOption(config, "b", impl, "b");
+    String metastoreImpl = "fs.s3a.metadatastore.impl";
+    String ddb = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
+    setBucketOption(config, "b", metastoreImpl, ddb);
+    setBucketOption(config, "b", "impl2", "b2");
+    setBucketOption(config, "b", "bucket.b.loop", "b3");
+    assertOptionEquals(config, "fs.s3a.bucket.b.impl", "b");
+
+    Configuration updated = propagateBucketOptions(config, "b");
+    assertOptionEquals(updated, impl, "orig");
+    assertOptionEquals(updated, "fs.s3a.impl2", "b2");
+    assertOptionEquals(updated, metastoreImpl, ddb);
+    assertOptionEquals(updated, "fs.s3a.bucket.b.loop", null);
+  }
+
+  @Test
+  public void testSecurityCredentialPropagationNoOverride() throws Exception {
+    Configuration config = new Configuration();
+    config.set(CREDENTIAL_PROVIDER_PATH, "base");
+    patchSecurityCredentialProviders(config);
+    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
+        "base");
+  }
+
+  @Test
+  public void testSecurityCredentialPropagationOverrideNoBase()
+      throws Exception {
+    Configuration config = new Configuration();
+    config.unset(CREDENTIAL_PROVIDER_PATH);
+    config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
+    patchSecurityCredentialProviders(config);
+    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
+        "override");
+  }
+
+  @Test
+  public void testSecurityCredentialPropagationOverride() throws Exception {
+    Configuration config = new Configuration();
+    config.set(CREDENTIAL_PROVIDER_PATH, "base");
+    config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
+    patchSecurityCredentialProviders(config);
+    assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
+        "override,base");
+    Collection<String> all = config.getStringCollection(
+        CREDENTIAL_PROVIDER_PATH);
+    assertTrue(all.contains("override"));
+    assertTrue(all.contains("base"));
+  }
+
+  @Test
+  public void testSecurityCredentialPropagationEndToEnd() throws Exception {
+    Configuration config = new Configuration();
+    config.set(CREDENTIAL_PROVIDER_PATH, "base");
+    setBucketOption(config, "b", S3A_SECURITY_CREDENTIAL_PROVIDER_PATH,
+        "override");
+    Configuration updated = propagateBucketOptions(config, "b");
+
+    patchSecurityCredentialProviders(updated);
+    assertOptionEquals(updated, CREDENTIAL_PROVIDER_PATH,
+        "override,base");
+  }
+
+  /**
+   * This test shows that a per-bucket value of the older key takes priority
+   * over a global value of a new key in XML configuration file.
+   */
+  @Test
+  public void testBucketConfigurationDeprecatedEncryptionAlgorithm()
+      throws Throwable {
+    Configuration config = new Configuration(false);
+    config.set(S3_ENCRYPTION_ALGORITHM, NEW_ALGORITHM_KEY_GLOBAL);
+    setBucketOption(config, "b", SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        OLD_ALGORITHM_KEY_BUCKET);
+    Configuration updated = propagateBucketOptions(config, "b");
+
+    // Get the encryption method and verify that the value is per-bucket of
+    // old keys.
+    String value = getEncryptionAlgorithm("b", updated).getMethod();
+    Assertions.assertThat(value)
+        .describedAs("lookupPassword(%s)", S3_ENCRYPTION_ALGORITHM)
+        .isEqualTo(OLD_ALGORITHM_KEY_BUCKET);
+  }
+
+  @Test
+  public void testJceksDeprecatedEncryptionAlgorithm() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration(false);
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(S3_ENCRYPTION_ALGORITHM,
+        NEW_ALGORITHM_KEY_GLOBAL.toCharArray());
+    provider.createCredentialEntry(S3_ENCRYPTION_KEY,
+        "global s3 encryption key".toCharArray());
+    provider.createCredentialEntry(
+        FS_S3A_BUCKET_PREFIX + "b." + SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        OLD_ALGORITHM_KEY_BUCKET.toCharArray());
+    final String bucketKey = "bucket-server-side-encryption-key";
+    provider.createCredentialEntry(
+        FS_S3A_BUCKET_PREFIX + "b." + SERVER_SIDE_ENCRYPTION_KEY,
+        bucketKey.toCharArray());
+    provider.flush();
+
+    // Get the encryption method and verify that the value is per-bucket of
+    // old keys.
+    final EncryptionSecrets secrets = S3AUtils.buildEncryptionSecrets("b", conf);
+    Assertions.assertThat(secrets.getEncryptionMethod().getMethod())
+        .describedAs("buildEncryptionSecrets() encryption algorithm resolved to %s", secrets)
+        .isEqualTo(OLD_ALGORITHM_KEY_BUCKET);
+
+    Assertions.assertThat(secrets.getEncryptionKey())
+        .describedAs("buildEncryptionSecrets() encryption key resolved to %s", secrets)
+        .isEqualTo(bucketKey);
+
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
index b3fc5de..6aed9e7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
@@ -66,8 +66,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.unsetHadoopCredentialProviders;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.*;
 import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenIOException.TOKEN_MISMATCH;
 import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.ALICE;
@@ -146,7 +148,7 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
     String s3EncryptionMethod =
         conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM,
             S3AEncryptionMethods.SSE_KMS.getMethod());
-    String s3EncryptionKey = conf.getTrimmed(Constants.S3_ENCRYPTION_KEY, "");
+    String s3EncryptionKey = getS3EncryptionKey(getTestBucketName(conf), conf);
     removeBaseAndBucketOverrides(conf,
         DELEGATION_TOKEN_BINDING,
         Constants.S3_ENCRYPTION_ALGORITHM,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
index 9325feb..0448f9d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
@@ -20,15 +20,17 @@ package org.apache.hadoop.fs.s3a.scale;
 
 import java.io.IOException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 
-import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
 
 /**
  * Class to test SSE_KMS encryption settings for huge files.
@@ -58,13 +60,13 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
   @Override
   protected boolean isEncrypted(S3AFileSystem fileSystem) {
     Configuration c = new Configuration();
-    return c.get(S3_ENCRYPTION_KEY) != null;
+    return StringUtils.isNotBlank(getS3EncryptionKey(getTestBucketName(c), c));
   }
 
   @Override
   protected void assertEncrypted(Path hugeFile) throws IOException {
     Configuration c = new Configuration();
-    String kmsKey = c.get(S3_ENCRYPTION_KEY);
+    String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
     EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile,
             SSE_KMS, kmsKey);
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/04: HADOOP-17817.HADOOP-17823. S3A to raise IOE if both S3-CSE and S3Guard enabled (#3239)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit abb367aec622c95b3be14e78ba618f3f78cfd4d4
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Wed Jul 28 20:04:43 2021 +0530

    HADOOP-17817.HADOOP-17823. S3A to raise IOE if both S3-CSE and S3Guard enabled (#3239)
    
    S3A S3Guard tests to skip if S3-CSE are enabled (#3263)
    
        Follow on to
        * HADOOP-13887. Encrypt S3A data client-side with AWS SDK (S3-CSE)
    
        If the S3A bucket is set up to use S3-CSE encryption, all tests which turn
        on S3Guard are skipped, so they don't raise any exceptions about
        incompatible configurations.
    
    Contributed by Mehakmeet Singh
    
    Change-Id: I9f4188109b56a1f4e5a31fae265d980c5795db1e
---
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  4 +++
 .../hadoop/fs/s3a/impl/InternalConstants.java      |  5 +++
 .../site/markdown/tools/hadoop-aws/encryption.md   |  1 +
 .../tools/hadoop-aws/troubleshooting_s3a.md        | 25 +++++++++++++++
 .../fs/contract/s3a/ITestS3AContractSeek.java      |  3 +-
 .../apache/hadoop/fs/contract/s3a/S3AContract.java | 19 +++++++++++
 .../apache/hadoop/fs/s3a/AbstractS3AMockTest.java  |  2 ++
 .../hadoop/fs/s3a/ITestS3AFSMainOperations.java    | 10 +++++-
 .../hadoop/fs/s3a/ITestS3GuardListConsistency.java |  2 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     | 37 ++++++++++++++++++++++
 .../ITestS3AFileContextCreateMkdir.java            |  6 ++++
 11 files changed, 111 insertions(+), 3 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 5373c43..9dedc51 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -217,6 +217,7 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIg
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_S3GUARD_INCOMPATIBLE;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
@@ -545,6 +546,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       if (hasMetadataStore()) {
         LOG.debug("Using metadata store {}, authoritative store={}, authoritative path={}",
             getMetadataStore(), allowAuthoritativeMetadataStore, allowAuthoritativePaths);
+        if (isCSEEnabled) {
+          throw new PathIOException(uri.toString(), CSE_S3GUARD_INCOMPATIBLE);
+        }
       }
 
       // LOG if S3Guard is disabled on the warn level set in config
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 51b1bf6..4dd52d9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -134,4 +134,9 @@ public final class InternalConstants {
    */
   public static final int CSE_PADDING_LENGTH = 16;
 
+  /**
+   * Error message to indicate S3-CSE is incompatible with S3Guard.
+   */
+  public static final String CSE_S3GUARD_INCOMPATIBLE = "S3-CSE cannot be "
+      + "used with S3Guard";
 }
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
index 888ed8e..5fa6a30 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
@@ -601,6 +601,7 @@ clients where S3-CSE has not been enabled.
 
 ### Limitations
 
+- S3Guard is not supported with S3-CSE.
 - Performance will be reduced. All encrypt/decrypt is now being done on the
  client.
 - Writing files may be slower, as only a single block can be encrypted and
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 6cdb492..33dd165 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -1435,6 +1435,31 @@ The user trying to use the KMS Key ID should have the right permissions to acces
 If not, then add permission(or IAM role) in "Key users" section by selecting the
 AWS-KMS CMK Key on AWS console.
 
+### S3-CSE cannot be used with S3Guard
+
+S3-CSE not supported for S3Guard enabled buckets.
+```
+org.apache.hadoop.fs.PathIOException: `s3a://test-bucket': S3-CSE cannot be used with S3Guard
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:543)
+    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3460)
+    at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
+    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3565)
+    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3512)
+    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:539)
+    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
+    at org.apache.hadoop.fs.shell.PathData.expandAsGlob(PathData.java:342)
+    at org.apache.hadoop.fs.shell.Command.expandArgument(Command.java:252)
+    at org.apache.hadoop.fs.shell.Command.expandArguments(Command.java:235)
+    at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:105)
+    at org.apache.hadoop.fs.shell.Command.run(Command.java:179)
+    at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
+    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:81)
+    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:95)
+    at org.apache.hadoop.fs.FsShell.main(FsShell.java:390)
+```
+If you want to use S3Guard then disable S3-CSE or disable S3Guard if you want
+to use S3-CSE.
+
 ### <a name="not_all_bytes_were_read"></a> Message appears in logs "Not all bytes were read from the S3ObjectInputStream"
 
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
index 1713624..329b940 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
@@ -143,7 +143,8 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
   public void teardown() throws Exception {
     super.teardown();
     S3AFileSystem fs = getFileSystem();
-    if (fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE, false)) {
+    if (fs != null && fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE,
+        false)) {
       fs.close();
     }
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
index 0d3dd4c..695a4a2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
@@ -18,12 +18,17 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipIfS3GuardAndS3CSEIOE;
+
 /**
  * The contract of S3A: only enabled if the test bucket is provided.
  */
@@ -63,6 +68,20 @@ public class S3AContract extends AbstractBondedFSContract {
     }
   }
 
+  /**
+   * Skip S3AFS initialization if S3-CSE and S3Guard are enabled.
+   *
+   */
+  @Override
+  public void init() throws IOException {
+    try {
+      super.init();
+    } catch (PathIOException ioe) {
+      // Skip the tests if S3-CSE and S3-Guard are enabled.
+      maybeSkipIfS3GuardAndS3CSEIOE(ioe);
+    }
+  }
+
   @Override
   public String getScheme() {
     return "s3a";
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index 6afdd76..afc444f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -58,6 +58,8 @@ public abstract class AbstractS3AMockTest {
     Configuration conf = createConfiguration();
     fs = new S3AFileSystem();
     URI uri = URI.create(FS_S3A + "://" + BUCKET);
+    // unset S3CSE property from config to avoid pathIOE.
+    conf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
     fs.initialize(uri, conf);
     s3 = fs.getAmazonS3ClientForTesting("mocking");
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
index 511aa0f..6669e84 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
@@ -33,6 +33,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
  */
 public class ITestS3AFSMainOperations extends FSMainOperationsBaseTest {
 
+  private S3AContract contract;
 
   public ITestS3AFSMainOperations() {
     super(createTestPath(
@@ -41,12 +42,19 @@ public class ITestS3AFSMainOperations extends FSMainOperationsBaseTest {
 
   @Override
   protected FileSystem createFileSystem() throws Exception {
-    S3AContract contract = new S3AContract(new Configuration());
+    contract = new S3AContract(new Configuration());
     contract.init();
     return contract.getTestFileSystem();
   }
 
   @Override
+  public void tearDown() throws Exception {
+    if (contract.getTestFileSystem() != null) {
+      super.tearDown();
+    }
+  }
+
+  @Override
   @Ignore("Permissions not supported")
   public void testListStatusThrowsExceptionForUnreadableDir() {
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 2475b54..0d54719 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -77,7 +77,7 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
 
   @Override
   public void teardown() throws Exception {
-    if (getFileSystem()
+    if (getFileSystem() != null && getFileSystem()
         .getAmazonS3Client() instanceof InconsistentAmazonS3Client) {
       clearInconsistency(getFileSystem());
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 6c51c73..70986d8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
@@ -37,6 +38,7 @@ import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
+import org.apache.hadoop.fs.s3a.impl.InternalConstants;
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
@@ -186,6 +188,8 @@ public final class S3ATestUtils {
     // make this whole class not run by default
     Assume.assumeTrue("No test filesystem in " + TEST_FS_S3A_NAME,
         liveTest);
+    // Skip if S3Guard and S3-CSE are enabled.
+    skipIfS3GuardAndS3CSEEnabled(conf);
     // patch in S3Guard options
     maybeEnableS3Guard(conf);
     S3AFileSystem fs1 = new S3AFileSystem();
@@ -229,6 +233,8 @@ public final class S3ATestUtils {
     // make this whole class not run by default
     Assume.assumeTrue("No test filesystem in " + TEST_FS_S3A_NAME,
         liveTest);
+    // Skip if S3Guard and S3-CSE are enabled.
+    skipIfS3GuardAndS3CSEEnabled(conf);
     // patch in S3Guard options
     maybeEnableS3Guard(conf);
     FileContext fc = FileContext.getFileContext(testURI, conf);
@@ -236,6 +242,37 @@ public final class S3ATestUtils {
   }
 
   /**
+   * Skip if S3Guard and S3CSE are enabled together.
+   *
+   * @param conf Test Configuration.
+   */
+  private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf) {
+    String encryptionMethod =
+        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM, "");
+    String metaStore = conf.getTrimmed(S3_METADATA_STORE_IMPL, "");
+    if (encryptionMethod.equals(S3AEncryptionMethods.CSE_KMS.getMethod()) &&
+        !metaStore.equals(S3GUARD_METASTORE_NULL)) {
+      skip("Skipped if CSE is enabled with S3Guard.");
+    }
+  }
+
+  /**
+   * Either skip if PathIOE occurred due to S3CSE and S3Guard
+   * incompatibility or throw the PathIOE.
+   *
+   * @param ioe PathIOE being parsed.
+   * @throws PathIOException Throws PathIOE if it doesn't relate to S3CSE
+   *                         and S3Guard incompatibility.
+   */
+  public static void maybeSkipIfS3GuardAndS3CSEIOE(PathIOException ioe)
+      throws PathIOException {
+    if (ioe.toString().contains(InternalConstants.CSE_S3GUARD_INCOMPATIBLE)) {
+      skip("Skipping since CSE is enabled with S3Guard.");
+    }
+    throw ioe;
+  }
+
+  /**
    * Get a long test property.
    * <ol>
    *   <li>Look up configuration value (which can pick up core-default.xml),
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextCreateMkdir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextCreateMkdir.java
index 4b8d4bb..dcc9da9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextCreateMkdir.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextCreateMkdir.java
@@ -32,4 +32,10 @@ public class ITestS3AFileContextCreateMkdir
     super.setUp();
   }
 
+  @Override
+  public void tearDown() throws Exception {
+    if (fc != null) {
+      super.tearDown();
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/04: HADOOP-17871. S3A CSE: minor tuning (#3412)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 769059c2f5f45a1d696cd0bbea7eeab18a3bbdbd
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Thu Sep 16 02:59:22 2021 +0530

    HADOOP-17871. S3A CSE: minor tuning (#3412)
    
    This migrates the fs.s3a-server-side encryption configuration options
    to a name which covers client-side encryption too.
    
    fs.s3a.server-side-encryption-algorithm becomes fs.s3a.encryption.algorithm
    fs.s3a.server-side-encryption.key becomes fs.s3a.encryption.key
    
    The existing keys remain valid, simply deprecated and remapped
    to the new values. If you want server-side encryption options
    to be picked up regardless of hadoop versions, use
    the old keys.
    
    (the old key also works for CSE, though as no version of Hadoop
    with CSE support has shipped without this remapping, it's less
    relevant)
    
    Contributed by: Mehakmeet Singh
    
    Change-Id: I51804b21b287dbce18864f0a6ad17126aba2b281
---
 .../hadoop/fs/CommonConfigurationKeysPublic.java   |   2 +
 .../src/main/resources/core-default.xml            |  24 ++--
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  26 +++-
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |  15 ++-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  11 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java    |  10 +-
 .../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java  |   4 +-
 .../site/markdown/tools/hadoop-aws/encryption.md   |  44 +++----
 .../src/site/markdown/tools/hadoop-aws/index.md    |  32 +++--
 .../src/site/markdown/tools/hadoop-aws/testing.md  |  11 +-
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  12 +-
 .../apache/hadoop/fs/s3a/AbstractS3AMockTest.java  |   2 +-
 .../hadoop/fs/s3a/AbstractTestS3AEncryption.java   |  11 +-
 .../apache/hadoop/fs/s3a/EncryptionTestUtils.java  |   4 +-
 .../fs/s3a/ITestS3AClientSideEncryption.java       |   8 +-
 .../fs/s3a/ITestS3AClientSideEncryptionKms.java    |   7 +-
 .../s3a/ITestS3AEncryptionAlgorithmValidation.java | 142 ++++++++++-----------
 .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java      |  19 ++-
 .../fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java |   2 +-
 .../ITestS3AEncryptionSSEKMSUserDefinedKey.java    |  18 +--
 .../hadoop/fs/s3a/ITestS3AEncryptionSSES3.java     |   2 +-
 .../ITestS3AEncryptionWithDefaultS3Settings.java   |  23 ++--
 .../hadoop/fs/s3a/ITestS3AMiscOperations.java      |   5 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  18 ++-
 .../apache/hadoop/fs/s3a/TestSSEConfiguration.java |  11 +-
 .../ITestSessionDelegationInFileystem.java         |  15 ++-
 .../fileContext/ITestS3AFileContextStatistics.java |   8 +-
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java    |   3 +-
 .../fs/s3a/scale/ITestS3AHugeFilesEncryption.java  |  23 +---
 .../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java |  12 +-
 30 files changed, 296 insertions(+), 228 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index f58df5e..1fa84c7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -973,6 +973,8 @@ public class CommonConfigurationKeysPublic {
           "ssl.keystore.pass$",
           "fs.s3.*[Ss]ecret.?[Kk]ey",
           "fs.s3a.*.server-side-encryption.key",
+          "fs.s3a.encryption.algorithm",
+          "fs.s3a.encryption.key",
           "fs.azure\\.account.key.*",
           "credential$",
           "oauth.*secret",
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 08da7b6..1e0e0b9 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -667,6 +667,8 @@
       ssl.keystore.pass$
       fs.s3a.server-side-encryption.key
       fs.s3a.*.server-side-encryption.key
+      fs.s3a.encryption.algorithm
+      fs.s3a.encryption.key
       fs.s3a.secret.key
       fs.s3a.*.secret.key
       fs.s3a.session.key
@@ -1569,20 +1571,22 @@
 </property>
 
 <property>
-  <name>fs.s3a.server-side-encryption-algorithm</name>
-  <description>Specify a server-side encryption algorithm for s3a: file system.
-    Unset by default.  It supports the following values: 'AES256' (for SSE-S3),
-    'SSE-KMS' and 'SSE-C'.
+  <name>fs.s3a.encryption.algorithm</name>
+  <description>Specify a server-side encryption or client-side
+    encryption algorithm for s3a: file system. Unset by default. It supports the
+    following values: 'AES256' (for SSE-S3), 'SSE-KMS', 'SSE-C', and 'CSE-KMS'
   </description>
 </property>
 
 <property>
-  <name>fs.s3a.server-side-encryption.key</name>
-  <description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
-    has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
-    should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
-    you'll be using your default's S3 KMS key, otherwise you should set this property to
-    the specific KMS key id.
+  <name>fs.s3a.encryption.key</name>
+  <description>Specific encryption key to use if fs.s3a.encryption.algorithm
+    has been set to 'SSE-KMS', 'SSE-C' or 'CSE-KMS'. In the case of SSE-C
+    , the value of this property should be the Base64 encoded key. If you are
+    using SSE-KMS and leave this property empty, you'll be using your default's
+    S3 KMS key, otherwise you should set this property to the specific KMS key
+    id. In case of 'CSE-KMS' this value needs to be the AWS-KMS Key ID
+    generated from AWS console.
   </description>
 </property>
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 8bd0291..56ea7d0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -421,11 +421,12 @@ public final class Constants {
   public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
 
   /**
-   * s3 server-side encryption or s3 client side encryption method, see
+   * s3 server-side encryption, see
    * {@link S3AEncryptionMethods} for valid options.
    *
    * {@value}
    */
+  @Deprecated
   public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
       "fs.s3a.server-side-encryption-algorithm";
 
@@ -449,10 +450,33 @@ public final class Constants {
    * May be set within a JCEKS file.
    * Value: "{@value}".
    */
+  @Deprecated
   public static final String SERVER_SIDE_ENCRYPTION_KEY =
       "fs.s3a.server-side-encryption.key";
 
   /**
+   * Set S3-server side encryption(SSE) or S3-Client side encryption(CSE)
+   * algorithm. Check {@link S3AEncryptionMethods} for valid options.
+   * <br>
+   * value: {@value}
+   */
+  public static final String S3_ENCRYPTION_ALGORITHM =
+      "fs.s3a.encryption.algorithm";
+
+  /**
+   * Set S3-SSE or S3-CSE encryption Key if required.
+   * <br>
+   * <i>Note:</i>
+   *   <ul>
+   *     <li>In case of S3-CSE this value needs to be set for CSE to work.</li>
+   *     <li>In case of S3-SSE follow {@link #SERVER_SIDE_ENCRYPTION_KEY}</li>
+   *   </ul>
+   * value:{@value}
+   */
+  public static final String S3_ENCRYPTION_KEY =
+      "fs.s3a.encryption.key";
+
+  /**
    * List of custom Signers. The signer class will be loaded, and the signer
    * name will be associated with this signer class in the S3 SDK.
    * Examples
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 2abef63..441ae70 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -58,8 +58,8 @@ import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
 
 /**
@@ -93,6 +93,9 @@ public class DefaultS3ClientFactory extends Configured
       "S3A filesystem client is using"
           + " the SDK region resolution chain.";
 
+  /** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
+  private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
+
   /**
    * Create the client by preparing the AwsConf configuration
    * and then invoking {@code buildAmazonS3Client()}.
@@ -125,7 +128,7 @@ public class DefaultS3ClientFactory extends Configured
 
     try {
       if (S3AEncryptionMethods.getMethod(S3AUtils.
-          lookupPassword(conf, SERVER_SIDE_ENCRYPTION_ALGORITHM, null))
+          lookupPassword(conf, S3_ENCRYPTION_ALGORITHM, null))
           .equals(S3AEncryptionMethods.CSE_KMS)) {
         return buildAmazonS3EncryptionClient(
             awsConf,
@@ -162,10 +165,10 @@ public class DefaultS3ClientFactory extends Configured
 
     //CSE-KMS Method
     String kmsKeyId = S3AUtils.lookupPassword(conf,
-        SERVER_SIDE_ENCRYPTION_KEY, null);
+        S3_ENCRYPTION_KEY, null);
     // Check if kmsKeyID is not null
     Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
-        + "requires KMS key ID. Use " + SERVER_SIDE_ENCRYPTION_KEY
+        + "requires KMS key ID. Use " + S3_ENCRYPTION_KEY
         + " property to set it. ");
 
     EncryptionMaterialsProvider materialsProvider =
@@ -191,6 +194,8 @@ public class DefaultS3ClientFactory extends Configured
     }
     builder.withCryptoConfiguration(cryptoConfigurationV2);
     client = builder.build();
+    IGNORE_CSE_WARN.info("S3 client-side encryption enabled: Ignore S3-CSE "
+        + "Warnings.");
 
     return client;
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9dedc51..a6b9862 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -373,7 +373,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     Configuration.DeprecationDelta[] deltas = {
         new Configuration.DeprecationDelta(
             FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS,
-            FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS)
+            FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS),
+        new Configuration.DeprecationDelta(
+            SERVER_SIDE_ENCRYPTION_ALGORITHM,
+            S3_ENCRYPTION_ALGORITHM),
+        new Configuration.DeprecationDelta(
+            SERVER_SIDE_ENCRYPTION_KEY,
+            S3_ENCRYPTION_KEY)
     };
 
     if (deltas.length > 0) {
@@ -436,7 +442,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       initializeStatisticsBinding();
       // If CSE-KMS method is set then CSE is enabled.
       isCSEEnabled = S3AUtils.lookupPassword(conf,
-          SERVER_SIDE_ENCRYPTION_ALGORITHM, null) != null;
+          Constants.S3_ENCRYPTION_ALGORITHM, "")
+          .equals(S3AEncryptionMethods.CSE_KMS.getMethod());
       LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
       setCSEGauge();
       // Username is the current user at the time the FS was instantiated.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 29fbac3..b5dc56f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -125,14 +125,14 @@ public final class S3AUtils {
   public static final String SSE_C_NO_KEY_ERROR =
       S3AEncryptionMethods.SSE_C.getMethod()
           + " is enabled but no encryption key was declared in "
-          + SERVER_SIDE_ENCRYPTION_KEY;
+          + Constants.S3_ENCRYPTION_KEY;
   /**
    * Encryption SSE-S3 is used but the caller also set an encryption key.
    */
   public static final String SSE_S3_WITH_KEY_ERROR =
       S3AEncryptionMethods.SSE_S3.getMethod()
           + " is enabled but an encryption key was set in "
-          + SERVER_SIDE_ENCRYPTION_KEY;
+          + Constants.S3_ENCRYPTION_KEY;
   public static final String EOF_MESSAGE_IN_XML_PARSER
       = "Failed to sanitize XML document destined for handler class";
 
@@ -1581,9 +1581,9 @@ public final class S3AUtils {
   public static String getS3EncryptionKey(String bucket,
       Configuration conf) {
     try {
-      return lookupPassword(bucket, conf, SERVER_SIDE_ENCRYPTION_KEY);
+      return lookupPassword(bucket, conf, Constants.S3_ENCRYPTION_KEY);
     } catch (IOException e) {
-      LOG.error("Cannot retrieve " + SERVER_SIDE_ENCRYPTION_KEY, e);
+      LOG.error("Cannot retrieve " + Constants.S3_ENCRYPTION_KEY, e);
       return "";
     }
   }
@@ -1603,7 +1603,7 @@ public final class S3AUtils {
       Configuration conf) throws IOException {
     S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(
         lookupPassword(bucket, conf,
-            SERVER_SIDE_ENCRYPTION_ALGORITHM));
+            Constants.S3_ENCRYPTION_ALGORITHM));
     String encryptionKey = getS3EncryptionKey(bucket, conf);
     int encryptionKeyLen =
         StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index ef47564..b0b3a8c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.amazonaws.services.s3.model.MultipartUpload;
+
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
@@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.MultipartUtils;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -1348,7 +1350,7 @@ public abstract class S3GuardTool extends Configured implements Tool,
           ENDPOINT,
           StringUtils.isNotEmpty(endpoint) ? endpoint : "(unset)");
       String encryption =
-          printOption(out, "\tEncryption", SERVER_SIDE_ENCRYPTION_ALGORITHM,
+          printOption(out, "\tEncryption", Constants.S3_ENCRYPTION_ALGORITHM,
               "none");
       printOption(out, "\tInput seek policy", INPUT_FADVISE, INPUT_FADV_NORMAL);
       printOption(out, "\tChange Detection Source", CHANGE_DETECT_SOURCE,
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
index 5fa6a30..ccdfeed 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
@@ -146,12 +146,12 @@ There is no extra cost for storing data with this option.
 ### Enabling SSE-S3
 
 To write S3-SSE encrypted files, the value of
-`fs.s3a.server-side-encryption-algorithm` must be set to that of
+`fs.s3a.encryption.algorithm` must be set to that of
 the encryption mechanism used in `core-site`; currently only `AES256` is supported.
 
 ```xml
 <property>
-  <name>fs.s3a.server-side-encryption-algorithm</name>
+  <name>fs.s3a.encryption.algorithm</name>
   <value>AES256</value>
 </property>
 ```
@@ -177,7 +177,7 @@ The AWS KMS [can be used encrypt data on S3uploaded data](http://docs.aws.amazon
 
 When uploading data encrypted with SSE-KMS, the sequence is as follows.
 
-1. The S3A client must declare a specific CMK in the property `fs.s3a.server-side-encryption.key`, or leave
+1. The S3A client must declare a specific CMK in the property `fs.s3a.encryption.key`, or leave
 it blank to use the default configured for that region.
 
 1. The S3A client uploads all the data as normal, now including encryption information.
@@ -221,32 +221,32 @@ they can be increased through the AWS console.
 
 ### Enabling SSE-KMS
 
-To enable SSE-KMS, the property `fs.s3a.server-side-encryption-algorithm` must be set to `SSE-KMS` in `core-site`:
+To enable SSE-KMS, the property `fs.s3a.encryption.algorithm` must be set to `SSE-KMS` in `core-site`:
 
 ```xml
 <property>
-  <name>fs.s3a.server-side-encryption-algorithm</name>
+  <name>fs.s3a.encryption.algorithm</name>
   <value>SSE-KMS</value>
 </property>
 ```
 
-The ID of the specific key used to encrypt the data should also be set in the property `fs.s3a.server-side-encryption.key`:
+The ID of the specific key used to encrypt the data should also be set in the property `fs.s3a.encryption.key`:
 
 ```xml
 <property>
-  <name>fs.s3a.server-side-encryption.key</name>
+  <name>fs.s3a.encryption.key</name>
   <value>arn:aws:kms:us-west-2:360379543683:key/071a86ff-8881-4ba0-9230-95af6d01ca01</value>
 </property>
 ```
 
 Organizations may define a default key in the Amazon KMS; if a default key is set,
-then it will be used whenever SSE-KMS encryption is chosen and the value of `fs.s3a.server-side-encryption.key` is empty.
+then it will be used whenever SSE-KMS encryption is chosen and the value of `fs.s3a.encryption.key` is empty.
 
 ### the S3A `fs.s3a.encryption.key` key only affects created files
 
-With SSE-KMS, the S3A client option `fs.s3a.server-side-encryption.key` sets the
+With SSE-KMS, the S3A client option `fs.s3a.encryption.key` sets the
 key to be used when new files are created. When reading files, this key,
-and indeed the value of `fs.s3a.server-side-encryption-algorithme` is ignored:
+and indeed the value of `fs.s3a.encryption.algorithm` is ignored:
 S3 will attempt to retrieve the key and decrypt the file based on the create-time settings.
 
 This means that
@@ -270,18 +270,18 @@ directory listings do not fail with "Bad Request" errors.
 
 ### Enabling SSE-C
 
-To use SSE-C, the configuration option `fs.s3a.server-side-encryption-algorithm`
+To use SSE-C, the configuration option `fs.s3a.encryption.algorithm`
 must be set to `SSE-C`, and a base-64 encoding of the key placed in
-`fs.s3a.server-side-encryption.key`.
+`fs.s3a.encryption.key`.
 
 ```xml
 <property>
-  <name>fs.s3a.server-side-encryption-algorithm</name>
+  <name>fs.s3a.encryption.algorithm</name>
   <value>SSE-C</value>
 </property>
 
 <property>
-  <name>fs.s3a.server-side-encryption.key</name>
+  <name>fs.s3a.encryption.key</name>
   <value>SGVscCwgSSdtIHRyYXBwZWQgaW5zaWRlIGEgYmFzZS02NC1jb2RlYyE=</value>
 </property>
 ```
@@ -290,11 +290,11 @@ All clients must share this same key.
 
 ### The `fs.s3a.encryption.key` value is used to read and write data
 
-With SSE-C, the S3A client option `fs.s3a.server-side-encryption.key` sets the
+With SSE-C, the S3A client option `fs.s3a.encryption.key` sets the
 key to be used for both reading *and* writing data.
 
 When reading any file written with SSE-C, the same key must be set
-in the property `fs.s3a.server-side-encryption.key`.
+in the property `fs.s3a.encryption.key`.
 
 This is unlike SSE-S3 and SSE-KMS, where the information needed to
 decode data is kept in AWS infrastructure.
@@ -618,8 +618,8 @@ clients where S3-CSE has not been enabled.
 - Generate an AWS KMS Key ID from AWS console for your bucket, with same
  region as the storage bucket.
 - If already created, [view the kms key ID by these steps.](https://docs.aws.amazon.com/kms/latest/developerguide/find-cmk-id-arn.html)
-- Set `fs.s3a.server-side-encryption-algorithm=CSE-KMS`.
-- Set `fs.s3a.server-side-encryption.key=<KMS_KEY_ID>`.
+- Set `fs.s3a.encryption.algorithm=CSE-KMS`.
+- Set `fs.s3a.encryption.key=<KMS_KEY_ID>`.
 
 KMS_KEY_ID:
 
@@ -634,18 +634,18 @@ For example:
 - Alias name: `alias/ExampleAlias`
 - Alias ARN: `arn:aws:kms:us-east-2:111122223333:alias/ExampleAlias`
 
-*Note:* If `fs.s3a.server-side-encryption-algorithm=CSE-KMS` is set,
-`fs.s3a.server-side-encryption.key=<KMS_KEY_ID>` property must be set for
+*Note:* If `fs.s3a.encryption.algorithm=CSE-KMS` is set,
+`fs.s3a.encryption.key=<KMS_KEY_ID>` property must be set for
 S3-CSE to work.
 
 ```xml
 <property>
-     <name>fs.s3a.server-side-encryption-algorithm</name>
+     <name>fs.s3a.encryption.algorithm</name>
      <value>CSE-KMS</value>
  </property>
 
  <property>
-     <name>fs.s3a.server-side-encryption.key</name>
+     <name>fs.s3a.encryption.key</name>
      <value>${KMS_KEY_ID}</value>
  </property>
 ```
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 8b963c3..f4f7144 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -647,7 +647,7 @@ files in local or Hadoop filesystems, and including them in requests.
 
 The S3A configuration options with sensitive data
 (`fs.s3a.secret.key`, `fs.s3a.access.key`,  `fs.s3a.session.token`
-and `fs.s3a.server-side-encryption.key`) can
+and `fs.s3a.encryption.key`) can
 have their data saved to a binary file stored, with the values being read in
 when the S3A filesystem URL is used for data access. The reference to this
 credential provider then declared in the Hadoop configuration.
@@ -663,8 +663,8 @@ stores.
 fs.s3a.access.key
 fs.s3a.secret.key
 fs.s3a.session.token
-fs.s3a.server-side-encryption.key
-fs.s3a.server-side-encryption-algorithm
+fs.s3a.encryption.key
+fs.s3a.encryption.algorithm
 ```
 
 The first three are for authentication; the final two for
@@ -969,20 +969,23 @@ options are covered in [Testing](./testing.md).
 </property>
 
 <property>
-  <name>fs.s3a.server-side-encryption-algorithm</name>
-  <description>Specify a server-side encryption algorithm for s3a: file system.
-    Unset by default. It supports the following values: 'AES256' (for SSE-S3), 'SSE-KMS'
-     and 'SSE-C'
+  <name>fs.s3a.encryption.algorithm</name>
+  <description>Specify a server-side encryption or client-side
+     encryption algorithm for s3a: file system. Unset by default. It supports the
+     following values: 'AES256' (for SSE-S3), 'SSE-KMS', 'SSE-C', and 'CSE-KMS'
   </description>
 </property>
 
 <property>
-    <name>fs.s3a.server-side-encryption.key</name>
-    <description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
-    has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
-    should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
-    you'll be using your default's S3 KMS key, otherwise you should set this property to
-    the specific KMS key id.</description>
+    <name>fs.s3a.encryption.key</name>
+    <description>Specific encryption key to use if fs.s3a.encryption.algorithm
+        has been set to 'SSE-KMS', 'SSE-C' or 'CSE-KMS'. In the case of SSE-C
+    , the value of this property should be the Base64 encoded key. If you are
+     using SSE-KMS and leave this property empty, you'll be using your default's
+     S3 KMS key, otherwise you should set this property to the specific KMS key
+     id. In case of 'CSE-KMS' this value needs to be the AWS-KMS Key ID
+     generated from AWS console.
+    </description>
 </property>
 
 <property>
@@ -1436,7 +1439,8 @@ Consider a JCEKS file with six keys:
 ```
 fs.s3a.access.key
 fs.s3a.secret.key
-fs.s3a.server-side-encryption-algorithm
+fs.s3a.encryption.algorithm
+fs.s3a.encryption.key
 fs.s3a.bucket.nightly.access.key
 fs.s3a.bucket.nightly.secret.key
 fs.s3a.bucket.nightly.session.token
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
index 69f589ba..e0a90a2 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
@@ -150,7 +150,7 @@ Example:
 ### <a name="encryption"></a> Configuring S3a Encryption
 
 For S3a encryption tests to run correctly, the
-`fs.s3a.server-side-encryption.key` must be configured in the s3a contract xml
+`fs.s3a.encryption.key` must be configured in the s3a contract xml
 file or `auth-keys.xml` file with a AWS KMS encryption key arn as this value is
 different for each AWS KMS. Please note this KMS key should be created in the
 same region as your S3 bucket. Otherwise, you may get `KMS.NotFoundException`.
@@ -159,13 +159,13 @@ Example:
 
 ```xml
 <property>
-  <name>fs.s3a.server-side-encryption.key</name>
+  <name>fs.s3a.encryption.key</name>
   <value>arn:aws:kms:us-west-2:360379543683:key/071a86ff-8881-4ba0-9230-95af6d01ca01</value>
 </property>
 ```
 
 You can also force all the tests to run with a specific SSE encryption method
-by configuring the property `fs.s3a.server-side-encryption-algorithm` in the s3a
+by configuring the property `fs.s3a.encryption.algorithm` in the s3a
 contract file.
 
 ### <a name="default_encyption"></a> Default Encryption
@@ -1466,7 +1466,7 @@ as it may take a couple of SDK updates before it is ready.
 1. Do a clean build and rerun all the `hadoop-aws` tests, with and without the `-Ds3guard -Ddynamo` options.
   This includes the `-Pscale` set, with a role defined for the assumed role tests.
   in `fs.s3a.assumed.role.arn` for testing assumed roles,
-  and `fs.s3a.server-side-encryption.key` for encryption, for full coverage.
+  and `fs.s3a.encryption.key` for encryption, for full coverage.
   If you can, scale up the scale tests.
 1. Run the `ILoadTest*` load tests from your IDE or via maven through
       `mvn verify -Dtest=skip -Dit.test=ILoadTest\*`  ; look for regressions in performance
@@ -1482,6 +1482,9 @@ as it may take a couple of SDK updates before it is ready.
   Examine the `target/dependencies.txt` file to verify that no new
   artifacts have unintentionally been declared as dependencies
   of the shaded `aws-java-sdk-bundle` artifact.
+1. Run a full AWS-test suite with S3 client-side encryption enabled by
+ setting `fs.s3a.encryption.algorithm` to 'CSE-KMS' and setting up AWS-KMS
+  Key ID in `fs.s3a.encryption.key`.
 
 ### Basic command line regression testing
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 33dd165..6f55d9e 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -1209,12 +1209,12 @@ KMS key ID is required for CSE-KMS to encrypt data, not providing one leads
 ```
 2021-07-07 11:33:04,550 WARN fs.FileSystem: Failed to initialize fileystem
 s3a://ap-south-cse/: java.lang.IllegalArgumentException: CSE-KMS
-method requires KMS key ID. Use fs.s3a.server-side-encryption.key property to set it.
--ls: CSE-KMS method requires KMS key ID. Use fs.s3a.server-side-encryption.key property to
+method requires KMS key ID. Use fs.s3a.encryption.key property to set it.
+-ls: CSE-KMS method requires KMS key ID. Use fs.s3a.encryption.key property to
  set it.
 ```
 
-set `fs.s3a.server-side-encryption.key=<KMS_KEY_ID>` generated through AWS console.
+set `fs.s3a.encryption.key=<KMS_KEY_ID>` generated through AWS console.
 
 ### `com.amazonaws.services.kms.model.IncorrectKeyException` The key ID in the request does not identify a CMK that can perform this operation.
 
@@ -1354,7 +1354,7 @@ work.
 
 ### com.amazonaws.services.kms.model.NotFoundException: Invalid keyId
 
-If the value in `fs.s3a.server-side-encryption.key` property, does not exist
+If the value in `fs.s3a.encryption.key` property, does not exist
 /valid in AWS KMS CMK(Customer managed keys), then this error would be seen.
 
 ```
@@ -1390,7 +1390,7 @@ Caused by: com.amazonaws.services.kms.model.NotFoundException: Invalid keyId abc
     ... 49 more
 ```
 
-Check if `fs.s3a.server-side-encryption.key` is set correctly and matches the
+Check if `fs.s3a.encryption.key` is set correctly and matches the
 same on AWS console.
 
 ### com.amazonaws.services.kms.model.AWSKMSException: User: <User_ARN> is not authorized to perform : kms :GenerateDataKey on resource: <KEY_ID>
@@ -1431,7 +1431,7 @@ User: arn:aws:iam::152813717728:user/<user> is not authorized to perform: kms:Ge
 ```
 
 The user trying to use the KMS Key ID should have the right permissions to access
-(encrypt/decrypt) using the AWS KMS Key used via `fs.s3a.server-side-encryption.key`.
+(encrypt/decrypt) using the AWS KMS Key used via `fs.s3a.encryption.key`.
 If not, then add permission(or IAM role) in "Key users" section by selecting the
 AWS-KMS CMK Key on AWS console.
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index afc444f..5765fe4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -59,7 +59,7 @@ public abstract class AbstractS3AMockTest {
     fs = new S3AFileSystem();
     URI uri = URI.create(FS_S3A + "://" + BUCKET);
     // unset S3CSE property from config to avoid pathIOE.
-    conf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
     fs.initialize(uri, conf);
     s3 = fs.getAmazonS3ClientForTesting("mocking");
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
index 7704fa8..8e3208c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -52,17 +54,20 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
   /**
    * This removes the encryption settings from the
    * configuration and then sets the
-   * fs.s3a.server-side-encryption-algorithm value to
+   * fs.s3a.encryption.algorithm value to
    * be that of {@code getSSEAlgorithm()}.
    * Called in {@code createConfiguration()}.
    * @param conf configuration to patch.
    */
+  @SuppressWarnings("deprecation")
   protected void patchConfigurationEncryptionSettings(
       final Configuration conf) {
     removeBaseAndBucketOverrides(conf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
         SERVER_SIDE_ENCRYPTION_ALGORITHM,
         SERVER_SIDE_ENCRYPTION_KEY);
-    conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
+    conf.set(S3_ENCRYPTION_ALGORITHM,
             getSSEAlgorithm().getMethod());
   }
 
@@ -159,7 +164,7 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
   protected void assertEncrypted(Path path) throws IOException {
     //S3 will return full arn of the key, so specify global arn in properties
     String kmsKeyArn = this.getConfiguration().
-        getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);
+        getTrimmed(S3_ENCRYPTION_KEY);
     S3AEncryptionMethods algorithm = getSSEAlgorithm();
     EncryptionTestUtils.assertEncrypted(getFileSystem(),
             path,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
index 4fc03e0..4013e9d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
@@ -27,7 +27,7 @@ import org.apache.commons.net.util.Base64;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -51,7 +51,7 @@ public final class EncryptionTestUtils {
    */
   public static String convertKeyToMd5(FileSystem fs) {
     String base64Key = fs.getConf().getTrimmed(
-        SERVER_SIDE_ENCRYPTION_KEY
+        S3_ENCRYPTION_KEY
     );
     byte[] key = Base64.decodeBase64(base64Key);
     byte[] md5 =  DigestUtils.md5(key);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
index 880f032..bb052ed 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
@@ -43,8 +43,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -203,8 +203,8 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
     Path encryptedFilePath = path(getMethodName() + "cse");
 
     // Initialize a CSE disabled FS.
-    cseDisabledConf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
-    cseDisabledConf.unset(SERVER_SIDE_ENCRYPTION_KEY);
+    cseDisabledConf.unset(S3_ENCRYPTION_ALGORITHM);
+    cseDisabledConf.unset(S3_ENCRYPTION_KEY);
     cseDisabledFS.initialize(getFileSystem().getUri(),
         cseDisabledConf);
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
index 35f648f..085c0f9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfKmsKeyIdIsNotSet;
 
 /**
  * Testing the S3 CSE - KMS method.
@@ -55,7 +55,8 @@ public class ITestS3AClientSideEncryptionKms
   @Override
   protected void maybeSkipTest() {
     skipIfEncryptionTestsDisabled(getConfiguration());
-    skipIfKmsKeyIdIsNotSet(getConfiguration());
+    // skip the test if CSE-KMS or KMS key is not set.
+    skipIfEncryptionNotSet(getConfiguration(), S3AEncryptionMethods.CSE_KMS);
   }
 
   @Override
@@ -71,7 +72,7 @@ public class ITestS3AClientSideEncryptionKms
     // Assert content encryption algo for KMS, is present in the
     // materials description and KMS key ID isn't.
     String keyId =
-        getConfiguration().get(Constants.SERVER_SIDE_ENCRYPTION_KEY);
+        getConfiguration().get(Constants.S3_ENCRYPTION_KEY);
     Assertions.assertThat(processHeader(fsXAttrs,
         xAttrPrefix + Headers.MATERIALS_DESCRIPTION))
         .describedAs("Materials Description should contain the content "
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java
index 40fa0cb..7e6aeb2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java
@@ -42,23 +42,23 @@ public class ITestS3AEncryptionAlgorithmValidation
   public void testEncryptionAlgorithmSetToDES() throws Throwable {
     //skip tests if they aren't enabled
     assumeEnabled();
-    intercept(IOException.class, "Unknown Server Side algorithm DES", () -> {
-
-        Configuration conf = super.createConfiguration();
-        //DES is an invalid encryption algorithm
-        conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, "DES");
-        S3AContract contract = (S3AContract) createContract(conf);
-        contract.init();
-        //extract the test FS
-        FileSystem fileSystem = contract.getTestFileSystem();
-        assertNotNull("null filesystem", fileSystem);
-        URI fsURI = fileSystem.getUri();
-        LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
-        assertEquals("wrong filesystem of " + fsURI,
-            contract.getScheme(), fsURI.getScheme());
-        fileSystem.initialize(fsURI, conf);
-        throw new Exception("Do not reach here");
-      });
+    intercept(IOException.class, "Unknown encryption algorithm DES", () -> {
+
+      Configuration conf = super.createConfiguration();
+      //DES is an invalid encryption algorithm
+      conf.set(Constants.S3_ENCRYPTION_ALGORITHM, "DES");
+      S3AContract contract = (S3AContract) createContract(conf);
+      contract.init();
+      //extract the test FS
+      FileSystem fileSystem = contract.getTestFileSystem();
+      assertNotNull("null filesystem", fileSystem);
+      URI fsURI = fileSystem.getUri();
+      LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
+      assertEquals("wrong filesystem of " + fsURI,
+          contract.getScheme(), fsURI.getScheme());
+      fileSystem.initialize(fsURI, conf);
+      return fileSystem;
+    });
   }
 
   @Test
@@ -67,25 +67,25 @@ public class ITestS3AEncryptionAlgorithmValidation
     //skip tests if they aren't enabled
     assumeEnabled();
     intercept(IllegalArgumentException.class, "The value of property " +
-        Constants.SERVER_SIDE_ENCRYPTION_KEY + " must not be null", () -> {
-
-        Configuration conf = super.createConfiguration();
-        //SSE-C must be configured with an encryption key
-        conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
-            S3AEncryptionMethods.SSE_C.getMethod());
-        conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, null);
-        S3AContract contract = (S3AContract) createContract(conf);
-        contract.init();
-        //extract the test FS
-        FileSystem fileSystem = contract.getTestFileSystem();
-        assertNotNull("null filesystem", fileSystem);
-        URI fsURI = fileSystem.getUri();
-        LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
-        assertEquals("wrong filesystem of " + fsURI,
-            contract.getScheme(), fsURI.getScheme());
-        fileSystem.initialize(fsURI, conf);
-        throw new Exception("Do not reach here");
-      });
+        Constants.S3_ENCRYPTION_KEY + " must not be null", () -> {
+
+      Configuration conf = super.createConfiguration();
+      //SSE-C must be configured with an encryption key
+      conf.set(Constants.S3_ENCRYPTION_ALGORITHM,
+          S3AEncryptionMethods.SSE_C.getMethod());
+      conf.set(Constants.S3_ENCRYPTION_KEY, null);
+      S3AContract contract = (S3AContract) createContract(conf);
+      contract.init();
+      //extract the test FS
+      FileSystem fileSystem = contract.getTestFileSystem();
+      assertNotNull("null filesystem", fileSystem);
+      URI fsURI = fileSystem.getUri();
+      LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
+      assertEquals("wrong filesystem of " + fsURI,
+          contract.getScheme(), fsURI.getScheme());
+      fileSystem.initialize(fsURI, conf);
+      return fileSystem;
+    });
   }
 
   @Test
@@ -93,23 +93,23 @@ public class ITestS3AEncryptionAlgorithmValidation
     Throwable {
     intercept(IOException.class, S3AUtils.SSE_C_NO_KEY_ERROR, () -> {
 
-        Configuration conf = super.createConfiguration();
-        //SSE-C must be configured with an encryption key
-        conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
-            S3AEncryptionMethods.SSE_C.getMethod());
-        conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
-        S3AContract contract = (S3AContract) createContract(conf);
-        contract.init();
-        //extract the test FS
-        FileSystem fileSystem = contract.getTestFileSystem();
-        assertNotNull("null filesystem", fileSystem);
-        URI fsURI = fileSystem.getUri();
-        LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
-        assertEquals("wrong filesystem of " + fsURI,
-            contract.getScheme(), fsURI.getScheme());
-        fileSystem.initialize(fsURI, conf);
-        throw new Exception("Do not reach here");
-      });
+      Configuration conf = super.createConfiguration();
+      //SSE-C must be configured with an encryption key
+      conf.set(Constants.S3_ENCRYPTION_ALGORITHM,
+          S3AEncryptionMethods.SSE_C.getMethod());
+      conf.set(Constants.S3_ENCRYPTION_KEY, "");
+      S3AContract contract = (S3AContract) createContract(conf);
+      contract.init();
+      //extract the test FS
+      FileSystem fileSystem = contract.getTestFileSystem();
+      assertNotNull("null filesystem", fileSystem);
+      URI fsURI = fileSystem.getUri();
+      LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
+      assertEquals("wrong filesystem of " + fsURI,
+          contract.getScheme(), fsURI.getScheme());
+      fileSystem.initialize(fsURI, conf);
+      return fileSystem;
+    });
   }
 
   @Test
@@ -119,24 +119,24 @@ public class ITestS3AEncryptionAlgorithmValidation
     assumeEnabled();
     intercept(IOException.class, S3AUtils.SSE_S3_WITH_KEY_ERROR, () -> {
 
-        Configuration conf = super.createConfiguration();
-        //SSE-S3 cannot be configured with an encryption key
-        conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
-            S3AEncryptionMethods.SSE_S3.getMethod());
-        conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
-            "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
-        S3AContract contract = (S3AContract) createContract(conf);
-        contract.init();
-        //extract the test FS
-        FileSystem fileSystem = contract.getTestFileSystem();
-        assertNotNull("null filesystem", fileSystem);
-        URI fsURI = fileSystem.getUri();
-        LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
-        assertEquals("wrong filesystem of " + fsURI,
-            contract.getScheme(), fsURI.getScheme());
-        fileSystem.initialize(fsURI, conf);
-        throw new Exception("Do not reach here");
-      });
+      Configuration conf = super.createConfiguration();
+      //SSE-S3 cannot be configured with an encryption key
+      conf.set(Constants.S3_ENCRYPTION_ALGORITHM,
+          S3AEncryptionMethods.SSE_S3.getMethod());
+      conf.set(Constants.S3_ENCRYPTION_KEY,
+          "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
+      S3AContract contract = (S3AContract) createContract(conf);
+      contract.init();
+      //extract the test FS
+      FileSystem fileSystem = contract.getTestFileSystem();
+      assertNotNull("null filesystem", fileSystem);
+      URI fsURI = fileSystem.getUri();
+      LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
+      assertEquals("wrong filesystem of " + fsURI,
+          contract.getScheme(), fsURI.getScheme());
+      fileSystem.initialize(fsURI, conf);
+      return fileSystem;
+    });
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
index 852b49a..ff46e98 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
@@ -42,6 +42,8 @@ import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_DELETE;
 import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_KEEP;
 import static org.apache.hadoop.fs.s3a.Constants.ETAG_CHECKSUM_ENABLED;
 import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
@@ -105,6 +107,7 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
     this.keepMarkers = keepMarkers;
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
@@ -121,15 +124,17 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
     removeBaseAndBucketOverrides(bucketName, conf,
         DIRECTORY_MARKER_POLICY,
         ETAG_CHECKSUM_ENABLED,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
         SERVER_SIDE_ENCRYPTION_ALGORITHM,
         SERVER_SIDE_ENCRYPTION_KEY);
     conf.set(DIRECTORY_MARKER_POLICY,
         keepMarkers
             ? DIRECTORY_MARKER_POLICY_KEEP
             : DIRECTORY_MARKER_POLICY_DELETE);
-    conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
+    conf.set(S3_ENCRYPTION_ALGORITHM,
         getSSEAlgorithm().getMethod());
-    conf.set(SERVER_SIDE_ENCRYPTION_KEY, KEY_1);
+    conf.set(S3_ENCRYPTION_KEY, KEY_1);
     conf.setBoolean(ETAG_CHECKSUM_ENABLED, true);
     return conf;
   }
@@ -251,8 +256,8 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
     fsKeyB.listFiles(pathABC, false);
 
     Configuration conf = this.createConfiguration();
-    conf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
-    conf.unset(SERVER_SIDE_ENCRYPTION_KEY);
+    conf.unset(S3_ENCRYPTION_ALGORITHM);
+    conf.unset(S3_ENCRYPTION_KEY);
 
     S3AContract contract = (S3AContract) createContract(conf);
     contract.init();
@@ -286,8 +291,8 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
 
     //Now try it with an unencrypted filesystem.
     Configuration conf = createConfiguration();
-    conf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
-    conf.unset(SERVER_SIDE_ENCRYPTION_KEY);
+    conf.unset(S3_ENCRYPTION_ALGORITHM);
+    conf.unset(S3_ENCRYPTION_KEY);
 
     S3AContract contract = (S3AContract) createContract(conf);
     contract.init();
@@ -385,7 +390,7 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
   private S3AFileSystem createNewFileSystemWithSSECKey(String sseCKey) throws
       IOException {
     Configuration conf = this.createConfiguration();
-    conf.set(SERVER_SIDE_ENCRYPTION_KEY, sseCKey);
+    conf.set(S3_ENCRYPTION_KEY, sseCKey);
 
     S3AContract contract = (S3AContract) createContract(conf);
     contract.init();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java
index 855c3b7..68ab5bd 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java
@@ -40,7 +40,7 @@ public class ITestS3AEncryptionSSEKMSDefaultKey
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
-    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
+    conf.set(Constants.S3_ENCRYPTION_KEY, "");
     return conf;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
index 3a8cf7a..c281ae1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
@@ -18,13 +18,11 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
-import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
 
 /**
  * Concrete class that extends {@link AbstractTestS3AEncryption}
@@ -38,15 +36,11 @@ public class ITestS3AEncryptionSSEKMSUserDefinedKey
   protected Configuration createConfiguration() {
     // get the KMS key for this test.
     Configuration c = new Configuration();
-    String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
-        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
-      skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
-          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
-          + "SSE-KMS");
-    }
+    String kmsKey = c.get(S3_ENCRYPTION_KEY);
+    // skip the test if SSE-KMS or KMS key not set.
+    skipIfEncryptionNotSet(c, getSSEAlgorithm());
     Configuration conf = super.createConfiguration();
-    conf.set(SERVER_SIDE_ENCRYPTION_KEY, kmsKey);
+    conf.set(S3_ENCRYPTION_KEY, kmsKey);
     return conf;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java
index 33a252a..93b8814 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java
@@ -32,7 +32,7 @@ public class ITestS3AEncryptionSSES3 extends AbstractTestS3AEncryption {
     S3ATestUtils.disableFilesystemCaching(conf);
     //must specify encryption key as empty because SSE-S3 does not allow it,
     //nor can it be null.
-    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
+    conf.set(Constants.S3_ENCRYPTION_KEY, "");
     return conf;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
index c7a62a3..0f48825 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
@@ -33,11 +33,12 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
 
 /**
  * Concrete class that extends {@link AbstractTestS3AEncryption}
@@ -56,21 +57,15 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
     // get the KMS key for this test.
     S3AFileSystem fs = getFileSystem();
     Configuration c = fs.getConf();
-    String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
-        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
-      skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
-          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
-          + "SSE-KMS");
-    }
+    skipIfEncryptionNotSet(c, getSSEAlgorithm());
   }
 
   @Override
   protected void patchConfigurationEncryptionSettings(
       final Configuration conf) {
     removeBaseAndBucketOverrides(conf,
-        SERVER_SIDE_ENCRYPTION_ALGORITHM);
-    conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_ALGORITHM);
+    conf.set(S3_ENCRYPTION_ALGORITHM,
             getSSEAlgorithm().getMethod());
   }
 
@@ -94,7 +89,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
   protected void assertEncrypted(Path path) throws IOException {
     S3AFileSystem fs = getFileSystem();
     Configuration c = fs.getConf();
-    String kmsKey = c.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);
+    String kmsKey = c.getTrimmed(S3_ENCRYPTION_KEY);
     EncryptionTestUtils.assertEncrypted(fs, path, SSE_KMS, kmsKey);
   }
 
@@ -142,7 +137,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
 
     // fs2 conf will always use SSE-KMS
     Configuration fs2Conf = new Configuration(fs.getConf());
-    fs2Conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
+    fs2Conf.set(S3_ENCRYPTION_ALGORITHM,
         S3AEncryptionMethods.SSE_KMS.getMethod());
     try (FileSystem kmsFS = FileSystem.newInstance(fs.getUri(), fs2Conf)) {
       Path targetDir = path("target");
@@ -150,7 +145,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
       ContractTestUtils.rename(kmsFS, src, targetDir);
       Path renamedFile = new Path(targetDir, src.getName());
       ContractTestUtils.verifyFileContents(fs, renamedFile, data);
-      String kmsKey = fs2Conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);
+      String kmsKey = fs2Conf.getTrimmed(S3_ENCRYPTION_KEY);
       // we assert that the renamed file has picked up the KMS key of our FS
       EncryptionTestUtils.assertEncrypted(fs, renamedFile, SSE_KMS, kmsKey);
     }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index daefa78..b4d2527 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -46,6 +46,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapab
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -68,10 +70,13 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
     enableChecksums(true);
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   protected Configuration createConfiguration() {
     final Configuration conf = super.createConfiguration();
     removeBaseAndBucketOverrides(conf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
         SERVER_SIDE_ENCRYPTION_ALGORITHM,
         SERVER_SIDE_ENCRYPTION_KEY);
     return conf;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 70986d8..49b7f77 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -248,7 +248,7 @@ public final class S3ATestUtils {
    */
   private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf) {
     String encryptionMethod =
-        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM, "");
+        conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM, "");
     String metaStore = conf.getTrimmed(S3_METADATA_STORE_IMPL, "");
     if (encryptionMethod.equals(S3AEncryptionMethods.CSE_KMS.getMethod()) &&
         !metaStore.equals(S3GUARD_METASTORE_NULL)) {
@@ -1533,14 +1533,20 @@ public final class S3ATestUtils {
   }
 
   /**
-   * Skip a test if CSE KMS key id is not set.
+   * Skip a test if encryption algorithm or encryption key is not set.
    *
    * @param configuration configuration to probe.
    */
-  public static void skipIfKmsKeyIdIsNotSet(Configuration configuration) {
-    if (configuration.get(
-        SERVER_SIDE_ENCRYPTION_KEY) == null) {
-      skip("AWS KMS key id is not set");
+  public static void skipIfEncryptionNotSet(Configuration configuration,
+      S3AEncryptionMethods s3AEncryptionMethod) {
+    // if S3 encryption algorithm is not set to desired method or AWS encryption
+    // key is not set, then skip.
+    if (!configuration.getTrimmed(S3_ENCRYPTION_ALGORITHM, "")
+        .equals(s3AEncryptionMethod.getMethod())
+        || configuration.get(Constants.S3_ENCRYPTION_KEY) == null) {
+      skip(S3_ENCRYPTION_KEY + " is not set for " + s3AEncryptionMethod
+          .getMethod() + " or " + S3_ENCRYPTION_ALGORITHM + " is not set to "
+          + s3AEncryptionMethod.getMethod());
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
index 273cf8b..6985fa4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
@@ -104,10 +104,10 @@ public class TestSSEConfiguration extends Assert {
     // set up conf to have a cred provider
     final Configuration conf = confWithProvider();
     String key = "provisioned";
-    setProviderOption(conf, SERVER_SIDE_ENCRYPTION_KEY, key);
+    setProviderOption(conf, Constants.S3_ENCRYPTION_KEY, key);
     // let's set the password in config and ensure that it uses the credential
     // provider provisioned value instead.
-    conf.set(SERVER_SIDE_ENCRYPTION_KEY, "keyInConfObject");
+    conf.set(Constants.S3_ENCRYPTION_KEY, "keyInConfObject");
 
     String sseKey = getS3EncryptionKey(BUCKET, conf);
     assertNotNull("Proxy password should not retrun null.", sseKey);
@@ -178,17 +178,20 @@ public class TestSSEConfiguration extends Assert {
    * @param key key, may be null
    * @return the new config.
    */
+  @SuppressWarnings("deprecation")
   private Configuration buildConf(String algorithm, String key) {
     Configuration conf = emptyConf();
     if (algorithm != null) {
-      conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, algorithm);
+      conf.set(Constants.S3_ENCRYPTION_ALGORITHM, algorithm);
     } else {
       conf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+      conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
     }
     if (key != null) {
-      conf.set(SERVER_SIDE_ENCRYPTION_KEY, key);
+      conf.set(Constants.S3_ENCRYPTION_KEY, key);
     } else {
       conf.unset(SERVER_SIDE_ENCRYPTION_KEY);
+      conf.unset(Constants.S3_ENCRYPTION_KEY);
     }
     return conf;
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
index 126f8b3..b3fc5de 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
 import org.apache.hadoop.fs.s3a.Invoker;
 import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
@@ -135,6 +136,7 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
     return SESSION_TOKEN_KIND;
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
@@ -142,11 +144,13 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
     assumeSessionTestsEnabled(conf);
     disableFilesystemCaching(conf);
     String s3EncryptionMethod =
-        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM,
             S3AEncryptionMethods.SSE_KMS.getMethod());
-    String s3EncryptionKey = conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY, "");
+    String s3EncryptionKey = conf.getTrimmed(Constants.S3_ENCRYPTION_KEY, "");
     removeBaseAndBucketOverrides(conf,
         DELEGATION_TOKEN_BINDING,
+        Constants.S3_ENCRYPTION_ALGORITHM,
+        Constants.S3_ENCRYPTION_KEY,
         SERVER_SIDE_ENCRYPTION_ALGORITHM,
         SERVER_SIDE_ENCRYPTION_KEY);
     conf.set(HADOOP_SECURITY_AUTHENTICATION,
@@ -155,9 +159,9 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
     conf.set(AWS_CREDENTIALS_PROVIDER, " ");
     // switch to CSE-KMS(if specified) else SSE-KMS.
     if (conf.getBoolean(KEY_ENCRYPTION_TESTS, true)) {
-      conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, s3EncryptionMethod);
+      conf.set(Constants.S3_ENCRYPTION_ALGORITHM, s3EncryptionMethod);
       // KMS key ID a must if CSE-KMS is being tested.
-      conf.set(SERVER_SIDE_ENCRYPTION_KEY, s3EncryptionKey);
+      conf.set(Constants.S3_ENCRYPTION_KEY, s3EncryptionKey);
     }
     // set the YARN RM up for YARN tests.
     conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
@@ -310,6 +314,7 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
    * Create a FS with a delegated token, verify it works as a filesystem,
    * and that you can pick up the same DT from that FS too.
    */
+  @SuppressWarnings("deprecation")
   @Test
   public void testDelegatedFileSystem() throws Throwable {
     describe("Delegation tokens can be passed to a new filesystem;"
@@ -348,6 +353,8 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
     // this is to simulate better a remote deployment.
     removeBaseAndBucketOverrides(bucket, conf,
         ACCESS_KEY, SECRET_KEY, SESSION_TOKEN,
+        Constants.S3_ENCRYPTION_ALGORITHM,
+        Constants.S3_ENCRYPTION_KEY,
         SERVER_SIDE_ENCRYPTION_ALGORITHM,
         SERVER_SIDE_ENCRYPTION_KEY,
         DELEGATION_TOKEN_ROLE_ARN,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 20ffd0f..ca8ce15 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -32,8 +32,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
 
@@ -86,9 +86,9 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
   protected void verifyWrittenBytes(FileSystem.Statistics stats) {
     //No extra bytes are written
     long expectedBlockSize = blockSize;
-    if (conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM, "")
+    if (conf.get(S3_ENCRYPTION_ALGORITHM, "")
         .equals(S3AEncryptionMethods.CSE_KMS.getMethod())) {
-      String keyId = conf.get(SERVER_SIDE_ENCRYPTION_KEY, "");
+      String keyId = conf.get(S3_ENCRYPTION_KEY, "");
       // Adding padding length and KMS key generation bytes written.
       expectedBlockSize += CSE_PADDING_LENGTH + keyId.getBytes().length +
           KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index 3e00917..2dd92bb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.Statistic;
@@ -402,7 +403,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
   public void test_040_PositionedReadHugeFile() throws Throwable {
     assumeHugeFileExists();
     final String encryption = getConf().getTrimmed(
-        SERVER_SIDE_ENCRYPTION_ALGORITHM);
+        Constants.S3_ENCRYPTION_ALGORITHM);
     boolean encrypted = encryption != null;
     if (encrypted) {
       LOG.info("File is encrypted with algorithm {}", encryption);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
index be51b2f..9325feb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
@@ -20,22 +20,19 @@ package org.apache.hadoop.fs.s3a.scale;
 
 import java.io.IOException;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
-import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 
-import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
-import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
 
 /**
  * Class to test SSE_KMS encryption settings for huge files.
- * Tests will only run if value of {@link Constants#SERVER_SIDE_ENCRYPTION_KEY}
+ * Tests will only run if value of {@link Constants#S3_ENCRYPTION_KEY}
  * is set in the configuration. The testing bucket must be configured with this
  * same key else test might fail.
  */
@@ -44,13 +41,7 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
   @Override
   public void setup() throws Exception {
     Configuration c = new Configuration();
-    String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
-        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
-      skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
-          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
-          + "SSE-KMS");
-    }
+    skipIfEncryptionNotSet(c, SSE_KMS);
     super.setup();
   }
 
@@ -61,19 +52,19 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
 
   /**
    * @param fileSystem
-   * @return true if {@link Constants#SERVER_SIDE_ENCRYPTION_KEY} is set
+   * @return true if {@link Constants#S3_ENCRYPTION_KEY} is set
    * in the config.
    */
   @Override
   protected boolean isEncrypted(S3AFileSystem fileSystem) {
     Configuration c = new Configuration();
-    return c.get(SERVER_SIDE_ENCRYPTION_KEY) != null;
+    return c.get(S3_ENCRYPTION_KEY) != null;
   }
 
   @Override
   protected void assertEncrypted(Path hugeFile) throws IOException {
     Configuration c = new Configuration();
-    String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
+    String kmsKey = c.get(S3_ENCRYPTION_KEY);
     EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile,
             SSE_KMS, kmsKey);
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
index a1c5c3f..d2e04d9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -45,15 +47,17 @@ public class ITestS3AHugeFilesSSECDiskBlocks
     skipIfEncryptionTestsDisabled(getConfiguration());
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   protected Configuration createScaleConfiguration() {
     Configuration conf = super.createScaleConfiguration();
-    removeBaseAndBucketOverrides(conf, SERVER_SIDE_ENCRYPTION_KEY,
-           SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    removeBaseAndBucketOverrides(conf, S3_ENCRYPTION_KEY,
+        S3_ENCRYPTION_ALGORITHM, SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_KEY);
     S3ATestUtils.disableFilesystemCaching(conf);
-    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+    conf.set(Constants.S3_ENCRYPTION_ALGORITHM,
         getSSEAlgorithm().getMethod());
-    conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, KEY_1);
+    conf.set(Constants.S3_ENCRYPTION_KEY, KEY_1);
     return conf;
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/04: HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK (#2706)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit aee975a136a07cd25977a453977c8c63b0c070dc
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Tue Jul 27 15:38:51 2021 +0530

    HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK (#2706)
    
    This (big!) patch adds support for client side encryption in AWS S3,
    with keys managed by AWS-KMS.
    
    Read the documentation in encryption.md very, very carefully before
    use and consider it unstable.
    
    S3-CSE is enabled in the existing configuration option
    "fs.s3a.server-side-encryption-algorithm":
    
    fs.s3a.server-side-encryption-algorithm=CSE-KMS
    fs.s3a.server-side-encryption.key=<KMS_KEY_ID>
    
    You cannot enable CSE and SSE in the same client, although
    you can still enable a default SSE option in the S3 console.
    
    * Filesystem list/get status operations subtract 16 bytes from the length
      of all files >= 16 bytes long to compensate for the padding which CSE
      adds.
    * The SDK always warns about the specific algorithm chosen being
      deprecated. It is critical to use this algorithm for ranged
      GET requests to work (i.e. random IO). Ignore.
    * Unencrypted files CANNOT BE READ.
      The entire bucket SHOULD be encrypted with S3-CSE.
    * Uploading files may be a bit slower as blocks are now
      written sequentially.
    * The Multipart Upload API is disabled when S3-CSE is active.
    
    Contributed by Mehakmeet Singh
    
    Change-Id: Ie1a27a036a39db66a67e9c6d33bc78d54ea708a0
---
 .../AbstractContractMultipartUploaderTest.java     |   3 +
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   7 +-
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      | 135 +++++++--
 .../java/org/apache/hadoop/fs/s3a/Listing.java     |   9 +-
 .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java |  43 ++-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  83 +++++-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java    |  46 ++--
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |   9 +-
 .../hadoop/fs/s3a/impl/HeaderProcessing.java       |  11 +-
 .../hadoop/fs/s3a/impl/InternalConstants.java      |   8 +
 .../apache/hadoop/fs/s3a/impl/RenameOperation.java |   2 +-
 .../apache/hadoop/fs/s3a/impl/StoreContext.java    |  15 +-
 .../hadoop/fs/s3a/impl/StoreContextBuilder.java    |  16 +-
 .../site/markdown/tools/hadoop-aws/encryption.md   | 233 +++++++++++++++-
 .../tools/hadoop-aws/troubleshooting_s3a.md        | 278 +++++++++++++++++++
 .../apache/hadoop/fs/s3a/AbstractS3ATestBase.java  |   9 +
 .../apache/hadoop/fs/s3a/EncryptionTestUtils.java  |   2 +-
 .../fs/s3a/ITestS3AClientSideEncryption.java       | 301 +++++++++++++++++++++
 .../fs/s3a/ITestS3AClientSideEncryptionKms.java    |  95 +++++++
 .../ITestS3AEncryptionSSEKMSUserDefinedKey.java    |   9 +-
 .../ITestS3AEncryptionWithDefaultS3Settings.java   |   6 +-
 .../hadoop/fs/s3a/ITestS3AInconsistency.java       |   9 +
 .../hadoop/fs/s3a/ITestS3AMiscOperations.java      |   3 +-
 .../hadoop/fs/s3a/ITestS3GuardListConsistency.java |   6 +-
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  13 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  12 +
 .../hadoop/fs/s3a/TestS3AInputStreamRetry.java     |   2 +-
 .../apache/hadoop/fs/s3a/TestSSEConfiguration.java |   6 +-
 .../hadoop/fs/s3a/auth/ITestCustomSigner.java      |  34 ++-
 .../ITestSessionDelegationInFileystem.java         |  23 +-
 .../hadoop/fs/s3a/commit/AbstractCommitITest.java  |   9 +-
 .../fileContext/ITestS3AFileContextStatistics.java |  33 ++-
 .../fs/s3a/scale/ITestS3AHugeFilesEncryption.java  |   8 +-
 .../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java |   2 +-
 .../s3a/scale/ITestS3AInputStreamPerformance.java  |   5 +
 35 files changed, 1370 insertions(+), 115 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
index 90e12a8..3e754e4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -89,6 +89,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
 
     final FileSystem fs = getFileSystem();
     Path testPath = getContract().getTestPath();
+    Assume.assumeTrue("Multipart uploader is not supported",
+        fs.hasPathCapability(testPath,
+            CommonPathCapabilities.FS_MULTIPART_UPLOADER));
     uploader0 = fs.createMultipartUploader(testPath).build();
     uploader1 = fs.createMultipartUploader(testPath).build();
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index e51d8ab..8bd0291 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -420,7 +420,12 @@ public final class Constants {
       "fs.s3a.multipart.purge.age";
   public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
 
-  // s3 server-side encryption, see S3AEncryptionMethods for valid options
+  /**
+   * s3 server-side encryption or s3 client side encryption method, see
+   * {@link S3AEncryptionMethods} for valid options.
+   *
+   * {@value}
+   */
   public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
       "fs.s3a.server-side-encryption-algorithm";
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 7dc920c..2abef63 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -25,13 +25,23 @@ import com.amazonaws.ClientConfiguration;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.handlers.RequestHandler2;
+import com.amazonaws.regions.RegionUtils;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Builder;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.AmazonS3EncryptionClientV2Builder;
+import com.amazonaws.services.s3.AmazonS3EncryptionV2;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.amazonaws.services.s3.internal.ServiceUtils;
+import com.amazonaws.services.s3.model.CryptoConfigurationV2;
+import com.amazonaws.services.s3.model.CryptoMode;
+import com.amazonaws.services.s3.model.CryptoRangeGetMode;
+import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
+import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider;
 import com.amazonaws.util.AwsHostNameUtils;
 import com.amazonaws.util.RuntimeHttpUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +58,8 @@ import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
 
 /**
@@ -112,9 +124,17 @@ public class DefaultS3ClientFactory extends Configured
     }
 
     try {
-      return buildAmazonS3Client(
-          awsConf,
-          parameters);
+      if (S3AEncryptionMethods.getMethod(S3AUtils.
+          lookupPassword(conf, SERVER_SIDE_ENCRYPTION_ALGORITHM, null))
+          .equals(S3AEncryptionMethods.CSE_KMS)) {
+        return buildAmazonS3EncryptionClient(
+            awsConf,
+            parameters);
+      } else {
+        return buildAmazonS3Client(
+            awsConf,
+            parameters);
+      }
     } catch (SdkClientException e) {
       // SDK refused to build.
       throw translateException("creating AWS S3 client", uri.toString(), e);
@@ -122,6 +142,60 @@ public class DefaultS3ClientFactory extends Configured
   }
 
   /**
+   * Create an {@link AmazonS3} client of type
+   * {@link AmazonS3EncryptionV2} if CSE is enabled.
+   *
+   * @param awsConf    AWS configuration.
+   * @param parameters parameters.
+   *
+   * @return new AmazonS3 client.
+   * @throws IOException if lookupPassword() has any problem.
+   */
+  protected AmazonS3 buildAmazonS3EncryptionClient(
+      final ClientConfiguration awsConf,
+      final S3ClientCreationParameters parameters) throws IOException {
+
+    AmazonS3 client;
+    AmazonS3EncryptionClientV2Builder builder =
+        new AmazonS3EncryptionClientV2Builder();
+    Configuration conf = getConf();
+
+    //CSE-KMS Method
+    String kmsKeyId = S3AUtils.lookupPassword(conf,
+        SERVER_SIDE_ENCRYPTION_KEY, null);
+    // Check if kmsKeyID is not null
+    Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
+        + "requires KMS key ID. Use " + SERVER_SIDE_ENCRYPTION_KEY
+        + " property to set it. ");
+
+    EncryptionMaterialsProvider materialsProvider =
+        new KMSEncryptionMaterialsProvider(kmsKeyId);
+    builder.withEncryptionMaterialsProvider(materialsProvider);
+    //Configure basic params of a S3 builder.
+    configureBasicParams(builder, awsConf, parameters);
+
+    // Configuring endpoint.
+    AmazonS3EncryptionClientV2Builder.EndpointConfiguration epr
+        = createEndpointConfiguration(parameters.getEndpoint(),
+        awsConf, getConf().getTrimmed(AWS_REGION));
+    configureEndpoint(builder, epr);
+
+    // Create cryptoConfig.
+    CryptoConfigurationV2 cryptoConfigurationV2 =
+        new CryptoConfigurationV2(CryptoMode.AuthenticatedEncryption)
+            .withRangeGetMode(CryptoRangeGetMode.ALL);
+    if (epr != null) {
+      cryptoConfigurationV2
+          .withAwsKmsRegion(RegionUtils.getRegion(epr.getSigningRegion()));
+      LOG.debug("KMS region used: {}", cryptoConfigurationV2.getAwsKmsRegion());
+    }
+    builder.withCryptoConfiguration(cryptoConfigurationV2);
+    client = builder.build();
+
+    return client;
+  }
+
+  /**
    * Use the Builder API to create an AWS S3 client.
    * <p>
    * This has a more complex endpoint configuration mechanism
@@ -137,33 +211,60 @@ public class DefaultS3ClientFactory extends Configured
       final ClientConfiguration awsConf,
       final S3ClientCreationParameters parameters) {
     AmazonS3ClientBuilder b = AmazonS3Client.builder();
-    b.withCredentials(parameters.getCredentialSet());
-    b.withClientConfiguration(awsConf);
-    b.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
+    configureBasicParams(b, awsConf, parameters);
+
+    // endpoint set up is a PITA
+    AwsClientBuilder.EndpointConfiguration epr
+        = createEndpointConfiguration(parameters.getEndpoint(),
+        awsConf, getConf().getTrimmed(AWS_REGION));
+    configureEndpoint(b, epr);
+    final AmazonS3 client = b.build();
+    return client;
+  }
+
+  /**
+   * A method to configure basic AmazonS3Builder parameters.
+   *
+   * @param builder    Instance of AmazonS3Builder used.
+   * @param awsConf    ClientConfiguration used.
+   * @param parameters Parameters used to set in the builder.
+   */
+  private void configureBasicParams(AmazonS3Builder builder,
+      ClientConfiguration awsConf, S3ClientCreationParameters parameters) {
+    builder.withCredentials(parameters.getCredentialSet());
+    builder.withClientConfiguration(awsConf);
+    builder.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
 
     if (parameters.getMetrics() != null) {
-      b.withMetricsCollector(
+      builder.withMetricsCollector(
           new AwsStatisticsCollector(parameters.getMetrics()));
     }
     if (parameters.getRequestHandlers() != null) {
-      b.withRequestHandlers(
+      builder.withRequestHandlers(
           parameters.getRequestHandlers().toArray(new RequestHandler2[0]));
     }
     if (parameters.getMonitoringListener() != null) {
-      b.withMonitoringListener(parameters.getMonitoringListener());
+      builder.withMonitoringListener(parameters.getMonitoringListener());
     }
 
-    // endpoint set up is a PITA
-    AwsClientBuilder.EndpointConfiguration epr
-        = createEndpointConfiguration(parameters.getEndpoint(),
-        awsConf, getConf().getTrimmed(AWS_REGION));
+  }
+
+  /**
+   * A method to configure endpoint and Region for an AmazonS3Builder.
+   *
+   * @param builder Instance of AmazonS3Builder used.
+   * @param epr     EndpointConfiguration used to set in builder.
+   */
+  private void configureEndpoint(
+      AmazonS3Builder builder,
+      AmazonS3Builder.EndpointConfiguration epr) {
     if (epr != null) {
       // an endpoint binding was constructed: use it.
-      b.withEndpointConfiguration(epr);
+      builder.withEndpointConfiguration(epr);
     } else {
       // no idea what the endpoint is, so tell the SDK
       // to work it out at the cost of an extra HEAD request
-      b.withForceGlobalBucketAccessEnabled(true);
+      builder.withForceGlobalBucketAccessEnabled(true);
       // HADOOP-17771 force set the region so the build process doesn't halt.
       String region = getConf().getTrimmed(AWS_REGION, AWS_S3_CENTRAL_REGION);
       LOG.debug("fs.s3a.endpoint.region=\"{}\"", region);
@@ -171,7 +272,7 @@ public class DefaultS3ClientFactory extends Configured
         // there's either an explicit region or we have fallen back
         // to the central one.
         LOG.debug("Using default endpoint; setting region to {}", region);
-        b.setRegion(region);
+        builder.setRegion(region);
       } else {
         // no region.
         // allow this if people really want it; it is OK to rely on this
@@ -180,8 +281,6 @@ public class DefaultS3ClientFactory extends Configured
         LOG.debug(SDK_REGION_CHAIN_IN_USE);
       }
     }
-    final AmazonS3 client = b.build();
-    return client;
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 113e6f4..5a4f551 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -90,6 +90,7 @@ import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFr
 public class Listing extends AbstractStoreOperation {
 
   private static final Logger LOG = S3AFileSystem.LOG;
+  private final boolean isCSEEnabled;
 
   static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
       new AcceptAllButS3nDirs();
@@ -97,9 +98,10 @@ public class Listing extends AbstractStoreOperation {
   private final ListingOperationCallbacks listingOperationCallbacks;
 
   public Listing(ListingOperationCallbacks listingOperationCallbacks,
-                 StoreContext storeContext) {
+      StoreContext storeContext) {
     super(storeContext);
     this.listingOperationCallbacks = listingOperationCallbacks;
+    this.isCSEEnabled = storeContext.isCSEEnabled();
   }
 
   /**
@@ -687,7 +689,7 @@ public class Listing extends AbstractStoreOperation {
           S3AFileStatus status = createFileStatus(keyPath, summary,
                   listingOperationCallbacks.getDefaultBlockSize(keyPath),
                   getStoreContext().getUsername(),
-              summary.getETag(), null);
+              summary.getETag(), null, isCSEEnabled);
           LOG.debug("Adding: {}", status);
           stats.add(status);
           added++;
@@ -961,7 +963,7 @@ public class Listing extends AbstractStoreOperation {
     public boolean accept(Path keyPath, S3ObjectSummary summary) {
       return !keyPath.equals(qualifiedPath)
           && !summary.getKey().endsWith(S3N_FOLDER_SUFFIX)
-          && !objectRepresentsDirectory(summary.getKey(), summary.getSize());
+          && !objectRepresentsDirectory(summary.getKey());
     }
 
     /**
@@ -1049,6 +1051,7 @@ public class Listing extends AbstractStoreOperation {
     }
   }
 
+  @SuppressWarnings("unchecked")
   public static RemoteIterator<LocatedFileStatus> toLocatedFileStatusIterator(
       RemoteIterator<? extends LocatedFileStatus> iterator) {
     return (RemoteIterator < LocatedFileStatus >) iterator;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 5ba39aa..ddb933d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -155,6 +155,9 @@ class S3ABlockOutputStream extends OutputStream implements
   private static final LogExactlyOnce WARN_ON_SYNCABLE =
       new LogExactlyOnce(LOG);
 
+  /** is client side encryption enabled? */
+  private final boolean isCSEEnabled;
+
   /**
    * An S3A output stream which uploads partitions in a separate pool of
    * threads; different {@link S3ADataBlocks.BlockFactory}
@@ -189,6 +192,7 @@ class S3ABlockOutputStream extends OutputStream implements
       LOG.debug("Put tracker requests multipart upload");
       initMultipartUpload();
     }
+    this.isCSEEnabled = builder.isCSEEnabled;
   }
 
   /**
@@ -307,29 +311,34 @@ class S3ABlockOutputStream extends OutputStream implements
       // of capacity
       // Trigger an upload then process the remainder.
       LOG.debug("writing more data than block has capacity -triggering upload");
-      uploadCurrentBlock();
+      uploadCurrentBlock(false);
       // tail recursion is mildly expensive, but given buffer sizes must be MB.
       // it's unlikely to recurse very deeply.
       this.write(source, offset + written, len - written);
     } else {
-      if (remainingCapacity == 0) {
+      if (remainingCapacity == 0 && !isCSEEnabled) {
         // the whole buffer is done, trigger an upload
-        uploadCurrentBlock();
+        uploadCurrentBlock(false);
       }
     }
   }
 
   /**
    * Start an asynchronous upload of the current block.
+   *
+   * @param isLast true, if part being uploaded is last and client side
+   *               encryption is enabled.
    * @throws IOException Problems opening the destination for upload,
-   * initializing the upload, or if a previous operation has failed.
+   *                     initializing the upload, or if a previous operation
+   *                     has failed.
    */
-  private synchronized void uploadCurrentBlock() throws IOException {
+  private synchronized void uploadCurrentBlock(boolean isLast)
+      throws IOException {
     Preconditions.checkState(hasActiveBlock(), "No active block");
     LOG.debug("Writing block # {}", blockCount);
     initMultipartUpload();
     try {
-      multiPartUpload.uploadBlockAsync(getActiveBlock());
+      multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast);
       bytesSubmitted += getActiveBlock().dataSize();
     } finally {
       // set the block to null, so the next write will create a new block.
@@ -389,8 +398,9 @@ class S3ABlockOutputStream extends OutputStream implements
         // PUT the final block
         if (hasBlock &&
             (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
-          //send last part
-          uploadCurrentBlock();
+          // send last part and set the value of isLastPart to true.
+          // Necessary to set this "true" in case of client side encryption.
+          uploadCurrentBlock(true);
         }
         // wait for the partial uploads to finish
         final List<PartETag> partETags =
@@ -760,7 +770,8 @@ class S3ABlockOutputStream extends OutputStream implements
      * @throws IOException upload failure
      * @throws PathIOException if too many blocks were written
      */
-    private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
+    private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
+        Boolean isLast)
         throws IOException {
       LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
       Preconditions.checkNotNull(uploadId, "Null uploadId");
@@ -781,6 +792,7 @@ class S3ABlockOutputStream extends OutputStream implements
             uploadData.getUploadStream(),
             uploadData.getFile(),
             0L);
+        request.setLastPart(isLast);
       } catch (SdkBaseException aws) {
         // catch and translate
         IOException e = translateException("upload", key, aws);
@@ -1042,6 +1054,9 @@ class S3ABlockOutputStream extends OutputStream implements
     /** Should Syncable calls be downgraded? */
     private boolean downgradeSyncableExceptions;
 
+    /** is Client side Encryption enabled? */
+    private boolean isCSEEnabled;
+
     private BlockOutputStreamBuilder() {
     }
 
@@ -1157,5 +1172,15 @@ class S3ABlockOutputStream extends OutputStream implements
       downgradeSyncableExceptions = value;
       return this;
     }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withCSEEnabled(boolean value) {
+      isCSEEnabled = value;
+      return this;
+    }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e0dfb56..5373c43 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -52,6 +52,7 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.SdkBaseException;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.Headers;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@@ -215,6 +216,7 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
@@ -359,6 +361,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private AuditManagerS3A auditManager =
       AuditIntegration.stubAuditManager();
 
+  /**
+   * Is this S3A FS instance using S3 client side encryption?
+   */
+  private boolean isCSEEnabled;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -421,12 +428,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // DT Bindings may override this
       setEncryptionSecrets(new EncryptionSecrets(
           getEncryptionAlgorithm(bucket, conf),
-          getServerSideEncryptionKey(bucket, getConf())));
+          getS3EncryptionKey(bucket, getConf())));
 
       invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
       instrumentation = new S3AInstrumentation(uri);
       initializeStatisticsBinding();
-
+      // If CSE-KMS method is set then CSE is enabled.
+      isCSEEnabled = S3AUtils.lookupPassword(conf,
+          SERVER_SIDE_ENCRYPTION_ALGORITHM, null) != null;
+      LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
+      setCSEGauge();
       // Username is the current user at the time the FS was instantiated.
       owner = UserGroupInformation.getCurrentUser();
       username = owner.getShortUserName();
@@ -515,6 +526,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
       blockOutputActiveBlocks = intOption(conf,
           FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
+      // If CSE is enabled, do multipart uploads serially.
+      if (isCSEEnabled) {
+        blockOutputActiveBlocks = 1;
+      }
       LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
               " queue limit={}",
           blockOutputBuffer, partSize, blockOutputActiveBlocks);
@@ -559,7 +574,22 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       stopAllServices();
       throw e;
     }
+  }
 
+  /**
+   * Set the client side encryption gauge to 0 or 1, indicating if CSE is
+   * enabled through the gauge or not.
+   */
+  private void setCSEGauge() {
+    IOStatisticsStore ioStatisticsStore =
+        (IOStatisticsStore) getIOStatistics();
+    if (isCSEEnabled) {
+      ioStatisticsStore
+          .setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 1L);
+    } else {
+      ioStatisticsStore
+          .setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 0L);
+    }
   }
 
   /**
@@ -1162,7 +1192,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * Get the encryption algorithm of this endpoint.
    * @return the encryption algorithm.
    */
-  public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
+  public S3AEncryptionMethods getS3EncryptionAlgorithm() {
     return encryptionSecrets.getEncryptionMethod();
   }
 
@@ -1503,7 +1533,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     return new S3ObjectAttributes(bucket,
         f,
         pathToKey(f),
-        getServerSideEncryptionAlgorithm(),
+        getS3EncryptionAlgorithm(),
         encryptionSecrets.getEncryptionKey(),
         eTag,
         versionId,
@@ -1628,7 +1658,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .withDowngradeSyncableExceptions(
             getConf().getBoolean(
                 DOWNGRADE_SYNCABLE_EXCEPTIONS,
-                DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT));
+                DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
+        .withCSEEnabled(isCSEEnabled);
     return new FSDataOutputStream(
         new S3ABlockOutputStream(builder),
         null);
@@ -3680,7 +3711,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         // look for the simple file
         ObjectMetadata meta = getObjectMetadata(key);
         LOG.debug("Found exact file: normal file {}", key);
-        return new S3AFileStatus(meta.getContentLength(),
+        long contentLength = meta.getContentLength();
+        // check if CSE is enabled, then strip padded length.
+        if (isCSEEnabled
+            && meta.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
+            && contentLength >= CSE_PADDING_LENGTH) {
+          contentLength -= CSE_PADDING_LENGTH;
+        }
+        return new S3AFileStatus(contentLength,
             dateToLong(meta.getLastModified()),
             path,
             getDefaultBlockSize(path),
@@ -4291,7 +4329,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         key, length, eTag, versionId);
     Path p = keyToQualifiedPath(key);
     Preconditions.checkArgument(length >= 0, "content length is negative");
-    final boolean isDir = objectRepresentsDirectory(key, length);
+    final boolean isDir = objectRepresentsDirectory(key);
     // kick off an async delete
     CompletableFuture<?> deletion;
     if (!keepDirectoryMarkers(p)) {
@@ -4471,9 +4509,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       sb.append(", blockSize=").append(getDefaultBlockSize());
     }
     sb.append(", multiPartThreshold=").append(multiPartThreshold);
-    if (getServerSideEncryptionAlgorithm() != null) {
-      sb.append(", serverSideEncryptionAlgorithm='")
-          .append(getServerSideEncryptionAlgorithm())
+    if (getS3EncryptionAlgorithm() != null) {
+      sb.append(", s3EncryptionAlgorithm='")
+          .append(getS3EncryptionAlgorithm())
           .append('\'');
     }
     if (blockFactory != null) {
@@ -4499,6 +4537,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           .append(getInstrumentation().toString())
           .append("}");
     }
+    sb.append(", ClientSideEncryption=").append(isCSEEnabled);
     sb.append('}');
     return sb.toString();
   }
@@ -5109,8 +5148,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       return isMagicCommitEnabled();
 
     case SelectConstants.S3_SELECT_CAPABILITY:
-      // select is only supported if enabled
-      return SelectBinding.isSelectEnabled(getConf());
+      // select is only supported if enabled and client side encryption is
+      // disabled.
+      return !isCSEEnabled && SelectBinding.isSelectEnabled(getConf());
 
     case CommonPathCapabilities.FS_CHECKSUMS:
       // capability depends on FS configuration
@@ -5118,8 +5158,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           ETAG_CHECKSUM_ENABLED_DEFAULT);
 
     case CommonPathCapabilities.ABORTABLE_STREAM:
-    case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
       return true;
+    case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
+      // client side encryption doesn't support multipart uploader.
+      return !isCSEEnabled;
 
     // this client is safe to use with buckets
     // containing directory markers anywhere in
@@ -5255,7 +5297,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   private void requireSelectSupport(final Path source) throws
       UnsupportedOperationException {
-    if (!SelectBinding.isSelectEnabled(getConf())) {
+    if (!isCSEEnabled && !SelectBinding.isSelectEnabled(getConf())) {
 
       throw new UnsupportedOperationException(
           SelectConstants.SELECT_UNSUPPORTED);
@@ -5378,6 +5420,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   public S3AMultipartUploaderBuilder createMultipartUploader(
       final Path basePath)
       throws IOException {
+    if(isCSEEnabled) {
+      throw new UnsupportedOperationException("Multi-part uploader not "
+          + "supported for Client side encryption.");
+    }
     final Path path = makeQualified(basePath);
     try (AuditSpan span = entryPoint(MULTIPART_UPLOAD_INSTANTIATED, path)) {
       StoreContext ctx = createStoreContext();
@@ -5416,6 +5462,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .setContextAccessors(new ContextAccessorsImpl())
         .setTimeProvider(getTtlTimeProvider())
         .setAuditor(getAuditor())
+        .setEnableCSE(isCSEEnabled)
         .build();
   }
 
@@ -5485,4 +5532,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       return S3AFileSystem.this.getRequestFactory();
     }
   }
+
+  /**
+   * a method to know if Client side encryption is enabled or not.
+   * @return a boolean stating if CSE is enabled.
+   */
+  public boolean isCSEEnabled() {
+    return isCSEEnabled;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 8a181f2..29fbac3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -87,6 +87,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
 import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.translateDeleteException;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -521,6 +522,7 @@ public final class S3AUtils {
    * @param owner owner of the file
    * @param eTag S3 object eTag or null if unavailable
    * @param versionId S3 object versionId or null if unavailable
+   * @param isCSEEnabled is client side encryption enabled?
    * @return a status entry
    */
   public static S3AFileStatus createFileStatus(Path keyPath,
@@ -528,10 +530,15 @@ public final class S3AUtils {
       long blockSize,
       String owner,
       String eTag,
-      String versionId) {
+      String versionId,
+      boolean isCSEEnabled) {
     long size = summary.getSize();
+    // check if cse is enabled; strip out constant padding length.
+    if (isCSEEnabled && size >= CSE_PADDING_LENGTH) {
+      size -= CSE_PADDING_LENGTH;
+    }
     return createFileStatus(keyPath,
-        objectRepresentsDirectory(summary.getKey(), size),
+        objectRepresentsDirectory(summary.getKey()),
         size, summary.getLastModified(), blockSize, owner, eTag, versionId);
   }
 
@@ -572,14 +579,11 @@ public final class S3AUtils {
   /**
    * Predicate: does the object represent a directory?.
    * @param name object name
-   * @param size object size
    * @return true if it meets the criteria for being an object
    */
-  public static boolean objectRepresentsDirectory(final String name,
-      final long size) {
+  public static boolean objectRepresentsDirectory(final String name) {
     return !name.isEmpty()
-        && name.charAt(name.length() - 1) == '/'
-        && size == 0L;
+        && name.charAt(name.length() - 1) == '/';
   }
 
   /**
@@ -1564,7 +1568,7 @@ public final class S3AUtils {
   }
 
   /**
-   * Get any SSE key from a configuration/credential provider.
+   * Get any SSE/CSE key from a configuration/credential provider.
    * This operation handles the case where the option has been
    * set in the provider or configuration to the option
    * {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
@@ -1574,7 +1578,7 @@ public final class S3AUtils {
    * @return the encryption key or ""
    * @throws IllegalArgumentException bad arguments.
    */
-  public static String getServerSideEncryptionKey(String bucket,
+  public static String getS3EncryptionKey(String bucket,
       Configuration conf) {
     try {
       return lookupPassword(bucket, conf, SERVER_SIDE_ENCRYPTION_KEY);
@@ -1585,7 +1589,7 @@ public final class S3AUtils {
   }
 
   /**
-   * Get the server-side encryption algorithm.
+   * Get the server-side encryption or client side encryption algorithm.
    * This includes validation of the configuration, checking the state of
    * the encryption key given the chosen algorithm.
    *
@@ -1597,22 +1601,23 @@ public final class S3AUtils {
    */
   public static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
       Configuration conf) throws IOException {
-    S3AEncryptionMethods sse = S3AEncryptionMethods.getMethod(
+    S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(
         lookupPassword(bucket, conf,
             SERVER_SIDE_ENCRYPTION_ALGORITHM));
-    String sseKey = getServerSideEncryptionKey(bucket, conf);
-    int sseKeyLen = StringUtils.isBlank(sseKey) ? 0 : sseKey.length();
-    String diagnostics = passwordDiagnostics(sseKey, "key");
-    switch (sse) {
+    String encryptionKey = getS3EncryptionKey(bucket, conf);
+    int encryptionKeyLen =
+        StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
+    String diagnostics = passwordDiagnostics(encryptionKey, "key");
+    switch (encryptionMethod) {
     case SSE_C:
       LOG.debug("Using SSE-C with {}", diagnostics);
-      if (sseKeyLen == 0) {
+      if (encryptionKeyLen == 0) {
         throw new IOException(SSE_C_NO_KEY_ERROR);
       }
       break;
 
     case SSE_S3:
-      if (sseKeyLen != 0) {
+      if (encryptionKeyLen != 0) {
         throw new IOException(SSE_S3_WITH_KEY_ERROR
             + " (" + diagnostics + ")");
       }
@@ -1623,12 +1628,17 @@ public final class S3AUtils {
           diagnostics);
       break;
 
+    case CSE_KMS:
+      LOG.debug("Using CSE-KMS with {}",
+          diagnostics);
+      break;
+
     case NONE:
     default:
       LOG.debug("Data is unencrypted");
       break;
     }
-    return sse;
+    return encryptionMethod;
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 7890e2d..fb28a40 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -577,7 +577,14 @@ public enum Statistic {
   AUDIT_REQUEST_EXECUTION(
       AuditStatisticNames.AUDIT_REQUEST_EXECUTION,
       "AWS request made",
-      TYPE_COUNTER);
+      TYPE_COUNTER),
+
+  /* Client side encryption gauge */
+  CLIENT_SIDE_ENCRYPTION_ENABLED(
+      "client_side_encryption_enabled",
+      "gauge to indicate if client side encryption is enabled",
+      TYPE_GAUGE
+  );
 
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
index 8c39aa4..17394b7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
@@ -301,8 +301,15 @@ public class HeaderProcessing extends AbstractStoreOperation {
         md.getContentEncoding());
     maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
         md.getContentLanguage());
-    maybeSetHeader(headers, XA_CONTENT_LENGTH,
-        md.getContentLength());
+    // If CSE is enabled, use the unencrypted content length.
+    if (md.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
+        && md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH) != null) {
+      maybeSetHeader(headers, XA_CONTENT_LENGTH,
+          md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH));
+    } else {
+      maybeSetHeader(headers, XA_CONTENT_LENGTH,
+          md.getContentLength());
+    }
     maybeSetHeader(headers, XA_CONTENT_MD5,
         md.getContentMD5());
     maybeSetHeader(headers, XA_CONTENT_RANGE,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index cf962b8..51b1bf6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -126,4 +126,12 @@ public final class InternalConstants {
    */
   public static final String AWS_REGION_SYSPROP = "aws.region";
 
+  /**
+   * S3 client side encryption adds padding to the content length of constant
+   * length of 16 bytes (at the moment, since we have only 1 content
+   * encryption algorithm). Use this to subtract while listing the content
+   * length when certain conditions are met.
+   */
+  public static final int CSE_PADDING_LENGTH = 16;
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
index 7b13d0d..6d0f49a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
@@ -638,7 +638,7 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
       copyResult = callbacks.copyFile(srcKey, destinationKey,
           srcAttributes, readContext);
     }
-    if (objectRepresentsDirectory(srcKey, len)) {
+    if (objectRepresentsDirectory(srcKey)) {
       renameTracker.directoryMarkerCopied(
           sourceFile,
           destination,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
index ac29780..d60e1a2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
@@ -129,6 +129,9 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
   /** Operation Auditor. */
   private final AuditSpanSource<AuditSpanS3A> auditor;
 
+  /** Is client side encryption enabled? */
+  private final boolean isCSEEnabled;
+
   /**
    * Instantiate.
    */
@@ -150,7 +153,8 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
       final boolean useListV1,
       final ContextAccessors contextAccessors,
       final ITtlTimeProvider timeProvider,
-      final AuditSpanSource<AuditSpanS3A> auditor) {
+      final AuditSpanSource<AuditSpanS3A> auditor,
+      final boolean isCSEEnabled) {
     this.fsURI = fsURI;
     this.bucket = bucket;
     this.configuration = configuration;
@@ -172,6 +176,7 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
     this.contextAccessors = contextAccessors;
     this.timeProvider = timeProvider;
     this.auditor = auditor;
+    this.isCSEEnabled = isCSEEnabled;
   }
 
   public URI getFsURI() {
@@ -429,4 +434,12 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
   public RequestFactory getRequestFactory() {
     return contextAccessors.getRequestFactory();
   }
+
+  /**
+   * return if the store context have client side encryption enabled.
+   * @return boolean indicating if CSE is enabled or not.
+   */
+  public boolean isCSEEnabled() {
+    return isCSEEnabled;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java
index 468af1b..d4021a7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java
@@ -73,6 +73,8 @@ public class StoreContextBuilder {
 
   private AuditSpanSource<AuditSpanS3A> auditor;
 
+  private boolean isCSEEnabled;
+
   public StoreContextBuilder setFsURI(final URI fsURI) {
     this.fsURI = fsURI;
     return this;
@@ -180,6 +182,17 @@ public class StoreContextBuilder {
     return this;
   }
 
+  /**
+   * set is client side encryption boolean value.
+   * @param value value indicating if client side encryption is enabled or not.
+   * @return builder instance.
+   */
+  public StoreContextBuilder setEnableCSE(
+      boolean value) {
+    isCSEEnabled = value;
+    return this;
+  }
+
   @SuppressWarnings("deprecation")
   public StoreContext build() {
     return new StoreContext(fsURI,
@@ -199,6 +212,7 @@ public class StoreContextBuilder {
         useListV1,
         contextAccessors,
         timeProvider,
-        auditor);
+        auditor,
+        isCSEEnabled);
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
index d869705..888ed8e 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
@@ -21,9 +21,13 @@
 ## <a name="introduction"></a> Introduction
 
 The S3A filesystem client supports Amazon S3's Server Side Encryption
-for at-rest data encryption.
-You should to read up on the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html)
-for S3 Server Side Encryption for up to date information on the encryption mechanisms.
+and Client Side Encryption for encrypting data at-rest.
+
+
+For up to date information on the encryption mechanisms, read:
+
+* [Protecting data using server-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html)
+* [Protecting data using client-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingClientSideEncryption.html)
 
 
 
@@ -32,18 +36,21 @@ Any new file written will be encrypted with this encryption configuration.
 When the S3A client reads a file, S3 will attempt to decrypt it using the mechanism
 and keys with which the file was encrypted.
 
-* It is **NOT** advised to mix and match encryption types in a bucket
+* It is **NOT** advised to mix and match encryption types in a bucket.
 * It is much simpler and safer to encrypt with just one type and key per bucket.
 * You can use AWS bucket policies to mandate encryption rules for a bucket.
 * You can use S3A per-bucket configuration to ensure that S3A clients use encryption
 policies consistent with the mandated rules.
-* You can use S3 Default Encryption to encrypt data without needing to
+* You can use S3 Default Encryption in AWS console to encrypt data without needing to
 set anything in the client.
 * Changing the encryption options on the client does not change how existing
 files were encrypted, except when the files are renamed.
-* For all mechanisms other than SSE-C, clients do not need any configuration
+* For all mechanisms other than SSE-C and CSE-KMS, clients do not need any configuration
 options set in order to read encrypted data: it is all automatically handled
 in S3 itself.
+* Encryption options and secrets are collected by [S3A Delegation Tokens](delegation_tokens.html) and passed to workers during job submission.
+* Encryption options and secrets MAY be stored in JCEKS files or any other Hadoop credential provider service.
+  This allows for more secure storage than XML files, including password protection of the secrets.
 
 ## <a name="encryption_types"></a>How data is encrypted
 
@@ -53,8 +60,7 @@ to encrypt the data as it saved to S3. It remains encrypted on S3 until deleted:
 clients cannot change the encryption attributes of an object once uploaded.
 
 The Amazon AWS SDK also offers client-side encryption, in which all the encoding
-and decoding of data is performed on the client. This is *not* supported by
-the S3A client.
+and decoding of data is performed on the client.
 
 The server-side "SSE" encryption is performed with symmetric AES256 encryption;
 S3 offers different mechanisms for actually defining the key to use.
@@ -70,6 +76,53 @@ by Amazon's Key Management Service, a key referenced by name in the uploading cl
 * SSE-C : the client specifies an actual base64 encoded AES-256 key to be used
 to encrypt and decrypt the data.
 
+Encryption options
+
+|  type | encryption | config on write | config on read |
+|-------|---------|-----------------|----------------|
+| `SSE-S3` | server side, AES256 | encryption algorithm | none |
+| `SSE-KMS` | server side, KMS key | key used to encrypt/decrypt | none |
+| `SSE-C` | server side, custom key | encryption algorithm and secret | encryption algorithm and secret |
+| `CSE-KMS` | client side, KMS key | encryption algorithm and key ID | encryption algorithm |
+
+With server-side encryption, the data is uploaded to S3 unencrypted (but wrapped by the HTTPS
+encryption channel).
+The data is encrypted in the S3 store and decrypted when it's being retrieved.
+
+A server side algorithm can be enabled by default for a bucket, so that
+whenever data is uploaded unencrypted a default encryption algorithm is added.
+When data is encrypted with S3-SSE or SSE-KMS it is transparent to all clients
+downloading the data.
+SSE-C is different in that every client must know the secret key needed to decypt the data.
+
+Working with SSE-C data is harder because every client must be configured to
+use the algorithm and supply the key. In particular, it is very hard to mix
+SSE-C encrypted objects in the same S3 bucket with objects encrypted with
+other algorithms or unencrypted; The S3A client (and other applications) get
+very confused.
+
+KMS-based key encryption is powerful as access to a key can be restricted to
+specific users/IAM roles. However, use of the key is billed and can be
+throttled. Furthermore as a client seeks around a file, the KMS key *may* be
+used multiple times.
+
+S3 Client side encryption (CSE-KMS) is an experimental feature added in July
+2021.
+
+This encrypts the data on the client, before transmitting to S3, where it is
+stored encrypted. The data is unencrypted after downloading when it is being
+read back.
+
+In CSE-KMS, the ID of an AWS-KMS key is provided to the S3A client;
+the client communicates with AWS-KMS to request a new encryption key, which
+KMS returns along with the same key encrypted with the KMS key.
+The S3 client encrypts the payload *and* attaches the KMS-encrypted version
+of the key as a header to the object.
+
+When downloading data, this header is extracted, passed to AWS KMS, and,
+if the client has the appropriate permissions, the symmetric key is
+retrieved.
+This key is then used to decode the data.
 
 ## <a name="sse-s3"></a> S3 Default Encryption
 
@@ -110,7 +163,7 @@ To learn more, refer to
 [Protecting Data Using Server-Side Encryption with Amazon S3-Managed Encryption Keys (SSE-S3) in AWS documentation](http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html).
 
 
-### <a name="sse-kms"></a> SSE-KMS: Amazon S3-KMS Managed Encryption Keys
+### <a name="sse-kms"></a> SSE-KMS: Server-Encryption with KMS Managed Encryption Keys
 
 
 Amazon offers a pay-per-use key management service, [AWS KMS](https://aws.amazon.com/documentation/kms/).
@@ -419,7 +472,82 @@ the data, so guaranteeing that access to thee data can be read by everyone
 granted access to that key, and nobody without access to it.
 
 
-###<a name="changing-encryption"></a> Use rename() to encrypt files with new keys
+### <a name="fattr"></a> Using `hadoop fs -getfattr` to view encryption information.
+
+The S3A client retrieves all HTTP headers from an object and returns
+them in the "XAttr" list of attributed, prefixed with `header.`.
+This makes them retrievable in the `getXAttr()` API calls, which
+is available on the command line through the `hadoop fs -getfattr -d` command.
+
+This makes viewing the encryption headers of a file straightforward.
+
+Here is an example of the operation invoked on a file where the client is using CSE-KMS:
+```
+bin/hadoop fs -getfattr -d s3a://test-london/file2
+
+2021-07-14 12:59:01,554 [main] WARN  s3.AmazonS3EncryptionClientV2 (AmazonS3EncryptionClientV2.java:warnOnLegacyCryptoMode(409)) - The S3 Encryption Client is configured to read encrypted data with legacy encryption modes through the CryptoMode setting. If you don't have objects encrypted with these legacy modes, you should disable support for them to enhance security. See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html
+2021-07-14 12:59:01,558 [main] WARN  s3.AmazonS3EncryptionClientV2 (AmazonS3EncryptionClientV2.java:warnOnRangeGetsEnabled(401)) - The S3 Encryption Client is configured to support range get requests. Range gets do not provide authenticated encryption properties even when used with an authenticated mode (AES-GCM). See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html
+# file: s3a://test-london/file2
+header.Content-Length="0"
+header.Content-Type="application/octet-stream"
+header.ETag="63b3f4bd6758712c98f1be86afad095a"
+header.Last-Modified="Wed Jul 14 12:56:06 BST 2021"
+header.x-amz-cek-alg="AES/GCM/NoPadding"
+header.x-amz-iv="ZfrgtxvcR41yNVkw"
+header.x-amz-key-v2="AQIDAHjZrfEIkNG24/xy/pqPPLcLJulg+O5pLgbag/745OH4VQFuiRnSS5sOfqXJyIa4FOvNAAAAfjB8BgkqhkiG9w0BBwagbzBtAgEAMGgGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQM7b5GZzD4eAGTC6gaAgEQgDte9Et/dhR7LAMU/NaPUZSgXWN5pRnM4hGHBiRNEVrDM+7+iotSTKxFco+VdBAzwFZfHjn0DOoSZEukNw=="
+header.x-amz-matdesc="{"aws:x-amz-cek-alg":"AES/GCM/NoPadding"}"
+header.x-amz-server-side-encryption="AES256"
+header.x-amz-tag-len="128"
+header.x-amz-unencrypted-content-length="0"
+header.x-amz-version-id="zXccFCB9eICszFgqv_paG1pzaUKY09Xa"
+header.x-amz-wrap-alg="kms+context"
+```
+
+Analysis
+
+1. The WARN commands are the AWS SDK warning that because the S3A client uses
+an encryption algorithm which seek() requires, the SDK considers it less
+secure than the most recent algorithm(s). Ignore.
+
+* `header.x-amz-server-side-encryption="AES256"` : the file has been encrypted with S3-SSE. This is set up as the S3 default encryption,
+so even when CSE is enabled, the data is doubly encrypted.
+* `header.x-amz-cek-alg="AES/GCM/NoPadding`: client-side encrypted with the `"AES/GCM/NoPadding` algorithm.
+* `header.x-amz-iv="ZfrgtxvcR41yNVkw"`:
+* `header.x-amz-key-v2="AQIDAHjZrfEIkNG24/xy/pqPPLcLJulg+O5pLgbag/745OH4VQFuiRnSS5sOfqXJyIa4FOvNAAAAfjB8BgkqhkiG9w0BBwagbzBtAgEAMGgGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQM7b5GZzD4eAGTC6gaAgEQgDte9Et/dhR7LAMU/NaPUZSgXWN5pRnM4hGHBiRNEVrDM+7+iotSTKxFco+VdBAzwFZfHjn0DOoSZEukNw=="`:
+* `header.x-amz-unencrypted-content-length="0"`: this is the length of the unencrypted data. The S3A client *DOES NOT* use this header;
+* `header.x-amz-wrap-alg="kms+context"`: the algorithm used to encrypt the CSE key
+  it always removes 16 bytes from non-empty files when declaring the length.
+* `header.x-amz-version-id="zXccFCB9eICszFgqv_paG1pzaUKY09Xa"`: the bucket is versioned; this is the version ID.
+
+And a directory encrypted with S3-SSE only:
+
+```
+bin/hadoop fs -getfattr -d s3a://test-london/user/stevel/target/test/data/sOCOsNgEjv
+
+# file: s3a://test-london/user/stevel/target/test/data/sOCOsNgEjv
+header.Content-Length="0"
+header.Content-Type="application/x-directory"
+header.ETag="d41d8cd98f00b204e9800998ecf8427e"
+header.Last-Modified="Tue Jul 13 20:12:07 BST 2021"
+header.x-amz-server-side-encryption="AES256"
+header.x-amz-version-id="KcDOVmznIagWx3gP1HlDqcZvm1mFWZ2a"
+```
+
+A file with no-encryption (on a bucket without versioning but with intelligent tiering):
+
+```
+bin/hadoop fs -getfattr -d s3a://landsat-pds/scene_list.gz
+
+# file: s3a://landsat-pds/scene_list.gz
+header.Content-Length="45603307"
+header.Content-Type="application/octet-stream"
+header.ETag="39c34d489777a595b36d0af5726007db"
+header.Last-Modified="Wed Aug 29 01:45:15 BST 2018"
+header.x-amz-storage-class="INTELLIGENT_TIERING"
+header.x-amz-version-id="null"
+```
+
+###<a name="changing-encryption"></a> Use `rename()` to encrypt files with new keys
 
 The encryption of an object is set when it is uploaded. If you want to encrypt
 an unencrypted file, or change the SEE-KMS key of a file, the only way to do
@@ -435,6 +563,91 @@ for reading as for writing, and you must supply that key for reading. There
 you need to copy one bucket to a different bucket, one with a different key.
 Use `distCp`for this, with per-bucket encryption policies.
 
+## <a name="cse"></a> Amazon S3 Client Side Encryption
+
+### Introduction
+Amazon S3 Client Side Encryption(S3-CSE), is used to encrypt data on the
+client-side and then transmit it over to S3 storage. The same encrypted data
+is then transmitted over to client while reading and then
+decrypted on the client-side.
+
+S3-CSE, uses `AmazonS3EncryptionClientV2.java`  as the AmazonS3 client. The
+encryption and decryption is done by AWS SDK. As of July 2021, Only CSE-KMS
+method is supported.
+
+A key reason this feature (HADOOP-13887) has been unavailable for a long time
+is that the AWS S3 client pads uploaded objects with a 16 byte footer. This
+meant that files were shorter when being read than when are listed them
+through any of the list API calls/getFileStatus(). Which broke many
+applications, including anything seeking near the end of a file to read a
+footer, as ORC and Parquet do.
+
+There is now a workaround: compensate for the footer in listings when CSE is enabled.
+
+- When listing files and directories, 16 bytes are subtracted from the length
+of all non-empty objects( greater than or equal to 16 bytes).
+- Directory markers MAY be longer than 0 bytes long.
+
+This "appears" to work; secondly it does in the testing as of July 2021. However
+, the length of files when listed through the S3A client is now going to be
+shorter than the length of files listed with other clients -including S3A
+clients where S3-CSE has not been enabled.
+
+### Features
+
+- Supports client side encryption with keys managed in AWS KMS.
+- encryption settings propagated into jobs through any issued delegation tokens.
+- encryption information stored as headers in the uploaded object.
+
+### Limitations
+
+- Performance will be reduced. All encrypt/decrypt is now being done on the
+ client.
+- Writing files may be slower, as only a single block can be encrypted and
+ uploaded at a time.
+- Multipart Uploader API disabled.
+- S3 Select is not supported.
+- Multipart uploads would be serial, and partSize must be a multiple of 16
+ bytes.
+- maximum message size in bytes that can be encrypted under this mode is
+ 2^36-32, or ~64G, due to the security limitation of AES/GCM as recommended by
+ NIST.
+
+### Setup
+- Generate an AWS KMS Key ID from AWS console for your bucket, with same
+ region as the storage bucket.
+- If already created, [view the kms key ID by these steps.](https://docs.aws.amazon.com/kms/latest/developerguide/find-cmk-id-arn.html)
+- Set `fs.s3a.server-side-encryption-algorithm=CSE-KMS`.
+- Set `fs.s3a.server-side-encryption.key=<KMS_KEY_ID>`.
+
+KMS_KEY_ID:
+
+Identifies the symmetric CMK that encrypts the data key.
+To specify a CMK, use its key ID, key ARN, alias name, or alias ARN. When
+using an alias name, prefix it with "alias/". To specify a CMK in a
+different AWS account, you must use the key ARN or alias ARN.
+
+For example:
+- Key ID: `1234abcd-12ab-34cd-56ef-1234567890ab`
+- Key ARN: `arn:aws:kms:us-east-2:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab`
+- Alias name: `alias/ExampleAlias`
+- Alias ARN: `arn:aws:kms:us-east-2:111122223333:alias/ExampleAlias`
+
+*Note:* If `fs.s3a.server-side-encryption-algorithm=CSE-KMS` is set,
+`fs.s3a.server-side-encryption.key=<KMS_KEY_ID>` property must be set for
+S3-CSE to work.
+
+```xml
+<property>
+     <name>fs.s3a.server-side-encryption-algorithm</name>
+     <value>CSE-KMS</value>
+ </property>
+
+ <property>
+     <name>fs.s3a.server-side-encryption.key</name>
+     <value>${KMS_KEY_ID}</value>
+ </property>
+```
 
 ## <a name="troubleshooting"></a> Troubleshooting Encryption
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 30047ed..6cdb492 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -1157,6 +1157,284 @@ is used, no encryption is specified, or the SSE-C specified is incorrect.
 2. A directory is encrypted with a SSE-C keyA and the user is trying to move a
 file using configured SSE-C keyB into that structure.
 
+## <a name="client-side-encryption"></a> S3 Client Side Encryption
+
+### Instruction file not found for S3 object
+
+Reading an unencrypted file would fail when read through CSE enabled client.
+```
+java.lang.SecurityException: Instruction file not found for S3 object with bucket name: ap-south-cse, key: unencryptedData.txt
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleAE.decipher(S3CryptoModuleAE.java:190)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleAE.getObjectSecurely(S3CryptoModuleAE.java:136)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.getObject(AmazonS3EncryptionClientV2.java:241)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl.getObject(S3AFileSystem.java:1462)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:217)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:216)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:382)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:230)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:354)
+    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:350)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:228)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:272)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:374)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:493)
+    at java.io.DataInputStream.read(DataInputStream.java:100)
+    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:94)
+    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:68)
+    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:129)
+    at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:101)
+    at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:96)
+    at org.apache.hadoop.fs.shell.Command.processPathInternal(Command.java:370)
+    at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:333)
+    at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:306)
+    at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:288)
+    at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:272)
+    at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:121)
+    at org.apache.hadoop.fs.shell.Command.run(Command.java:179)
+    at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
+    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:81)
+    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:95)
+    at org.apache.hadoop.fs.FsShell.main(FsShell.java:390)
+```
+CSE enabled client should read encrypted data only.
+
+### CSE-KMS method requires KMS key ID
+
+KMS key ID is required for CSE-KMS to encrypt data, not providing one leads
+ to failure.
+
+```
+2021-07-07 11:33:04,550 WARN fs.FileSystem: Failed to initialize fileystem
+s3a://ap-south-cse/: java.lang.IllegalArgumentException: CSE-KMS
+method requires KMS key ID. Use fs.s3a.server-side-encryption.key property to set it.
+-ls: CSE-KMS method requires KMS key ID. Use fs.s3a.server-side-encryption.key property to
+ set it.
+```
+
+set `fs.s3a.server-side-encryption.key=<KMS_KEY_ID>` generated through AWS console.
+
+### `com.amazonaws.services.kms.model.IncorrectKeyException` The key ID in the request does not identify a CMK that can perform this operation.
+
+KMS key ID used to PUT(encrypt) the data, must be the one used to GET the
+data.
+ ```
+cat: open s3a://ap-south-cse/encryptedData.txt at 0 on
+s3a://ap-south-cse/encryptedData.txt:
+com.amazonaws.services.kms.model.IncorrectKeyException: The key ID in the
+request does not identify a CMK that can perform this operation. (Service: AWSKMS;
+Status Code: 400; ErrorCode: IncorrectKeyException;
+Request ID: da21aa8a-f00d-467c-94a0-32b627d32bc0; Proxy: null):IncorrectKeyException:
+The key ID in the request does not identify a CMK that can perform this
+operation. (Service: AWSKMS ; Status Code: 400; Error Code: IncorrectKeyException;
+Request ID: da21aa8a-f00d-467c-94a0-32b627d32bc0; Proxy: null)
+```
+Use the same KMS key ID used to upload data to download and read it as well.
+
+### `com.amazonaws.services.kms.model.NotFoundException` key/<KMS_KEY_ID> does not exist
+
+Using a KMS key ID from a different region than the bucket used to store data
+ would lead to failure while uploading.
+
+```
+mkdir: PUT 0-byte object  on testmkdir:
+com.amazonaws.services.kms.model.NotFoundException: Key
+'arn:aws:kms:ap-south-1:152813717728:key/<KMS_KEY_ID>'
+does not exist (Service: AWSKMS; Status Code: 400; Error Code: NotFoundException;
+Request ID: 279db85d-864d-4a38-9acd-d892adb504c0; Proxy: null):NotFoundException:
+Key 'arn:aws:kms:ap-south-1:152813717728:key/<KMS_KEY_ID>'
+does not exist(Service: AWSKMS; Status Code: 400; Error Code: NotFoundException;
+Request ID: 279db85d-864d-4a38-9acd-d892adb504c0; Proxy: null)
+```
+While generating the KMS Key ID make sure to generate it in the same region
+ as your bucket.
+
+### Unable to perform range get request: Range get support has been disabled
+
+If Range get is not supported for a CSE algorithm or is disabled:
+```
+java.lang.SecurityException: Unable to perform range get request: Range get support has been disabled. See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html
+
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleAE.assertCanGetPartialObject(S3CryptoModuleAE.java:446)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleAE.getObjectSecurely(S3CryptoModuleAE.java:117)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.getObject(AmazonS3EncryptionClientV2.java:241)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl.getObject(S3AFileSystem.java:1462)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:217)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:216)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:382)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:230)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:354)
+    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:350)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:228)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:272)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:374)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:408)
+    at java.io.DataInputStream.readByte(DataInputStream.java:265)
+```
+Range gets must be enabled for CSE to work.
+
+### WARNING: Range gets do not provide authenticated encryption properties even when used with an authenticated mode (AES-GCM).
+
+The S3 Encryption Client is configured to support range get requests. This
+ warning would be shown everytime S3-CSE is used.
+```
+2021-07-14 12:54:09,525 [main] WARN  s3.AmazonS3EncryptionClientV2
+(AmazonS3EncryptionClientV2.java:warnOnRangeGetsEnabled(401)) - The S3
+Encryption Client is configured to support range get requests. Range gets do
+not provide authenticated encryption properties even when used with an
+authenticated mode (AES-GCM). See https://docs.aws.amazon.com/general/latest
+/gr/aws_sdk_cryptography.html
+```
+We can Ignore this warning since, range gets must be enabled for S3-CSE to
+get data.
+
+### WARNING: If you don't have objects encrypted with these legacy modes, you should disable support for them to enhance security.
+
+The S3 Encryption Client is configured to read encrypted data with legacy
+encryption modes through the CryptoMode setting, and we would see this
+warning for all S3-CSE request.
+
+```
+2021-07-14 12:54:09,519 [main] WARN  s3.AmazonS3EncryptionClientV2
+(AmazonS3EncryptionClientV2.java:warnOnLegacyCryptoMode(409)) - The S3
+Encryption Client is configured to read encrypted data with legacy
+encryption modes through the CryptoMode setting. If you don't have objects
+encrypted with these legacy modes, you should disable support for them to
+enhance security. See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html
+```
+We can ignore this, since this CryptoMode setting(CryptoMode.AuthenticatedEncryption)
+is required for range gets to work.
+
+### com.amazonaws.services.kms.model.InvalidKeyUsageException: You cannot generate a data key with an asymmetric CMK
+
+If you generated an Asymmetric CMK from AWS console then CSE-KMS won't be
+able to generate unique data key for encryption.
+
+```
+Caused by: com.amazonaws.services.kms.model.InvalidKeyUsageException:
+You cannot generate a data key with an asymmetric CMK
+(Service: AWSKMS; Status Code: 400; Error Code: InvalidKeyUsageException; Request ID: 93609c15-e490-4035-8390-f4396f0d90bf; Proxy: null)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
+    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:7223)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7190)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7179)
+    at com.amazonaws.services.kms.AWSKMSClient.executeGenerateDataKey(AWSKMSClient.java:3482)
+    at com.amazonaws.services.kms.AWSKMSClient.generateDataKey(AWSKMSClient.java:3451)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.buildContentCryptoMaterial(S3CryptoModuleBase.java:533)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.newContentCryptoMaterial(S3CryptoModuleBase.java:481)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.createContentCryptoMaterial(S3CryptoModuleBase.java:447)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectUsingMetadata(S3CryptoModuleBase.java:160)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectSecurely(S3CryptoModuleBase.java:156)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.putObject(AmazonS3EncryptionClientV2.java:236)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2792)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2789)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$33(S3AFileSystem.java:4440)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    ... 49 more
+```
+
+Generate a Symmetric Key in the same region as your S3 storage for CSE-KMS to
+work.
+
+### com.amazonaws.services.kms.model.NotFoundException: Invalid keyId
+
+If the value in `fs.s3a.server-side-encryption.key` property, does not exist
+/valid in AWS KMS CMK(Customer managed keys), then this error would be seen.
+
+```
+Caused by: com.amazonaws.services.kms.model.NotFoundException: Invalid keyId abc
+(Service: AWSKMS; Status Code: 400; Error Code: NotFoundException; Request ID: 9d53552a-3d1b-47c8-984c-9a599d5c2391; Proxy: null)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
+    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:7223)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7190)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7179)
+    at com.amazonaws.services.kms.AWSKMSClient.executeGenerateDataKey(AWSKMSClient.java:3482)
+    at com.amazonaws.services.kms.AWSKMSClient.generateDataKey(AWSKMSClient.java:3451)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.buildContentCryptoMaterial(S3CryptoModuleBase.java:533)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.newContentCryptoMaterial(S3CryptoModuleBase.java:481)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.createContentCryptoMaterial(S3CryptoModuleBase.java:447)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectUsingMetadata(S3CryptoModuleBase.java:160)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectSecurely(S3CryptoModuleBase.java:156)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.putObject(AmazonS3EncryptionClientV2.java:236)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2792)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2789)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$33(S3AFileSystem.java:4440)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    ... 49 more
+```
+
+Check if `fs.s3a.server-side-encryption.key` is set correctly and matches the
+same on AWS console.
+
+### com.amazonaws.services.kms.model.AWSKMSException: User: <User_ARN> is not authorized to perform : kms :GenerateDataKey on resource: <KEY_ID>
+
+User doesn't have authorization to the specific AWS KMS Key ID.
+```
+Caused by: com.amazonaws.services.kms.model.AWSKMSException:
+User: arn:aws:iam::152813717728:user/<user> is not authorized to perform: kms:GenerateDataKey on resource: <key_ID>
+(Service: AWSKMS; Status Code: 400; Error Code: AccessDeniedException; Request ID: 4ded9f1f-b245-4213-87fc-16cba7a1c4b9; Proxy: null)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
+    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:7223)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7190)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7179)
+    at com.amazonaws.services.kms.AWSKMSClient.executeGenerateDataKey(AWSKMSClient.java:3482)
+    at com.amazonaws.services.kms.AWSKMSClient.generateDataKey(AWSKMSClient.java:3451)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.buildContentCryptoMaterial(S3CryptoModuleBase.java:533)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.newContentCryptoMaterial(S3CryptoModuleBase.java:481)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.createContentCryptoMaterial(S3CryptoModuleBase.java:447)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectUsingMetadata(S3CryptoModuleBase.java:160)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectSecurely(S3CryptoModuleBase.java:156)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.putObject(AmazonS3EncryptionClientV2.java:236)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2792)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2789)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$33(S3AFileSystem.java:4440)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    ... 49 more
+```
+
+The user trying to use the KMS Key ID should have the right permissions to access
+(encrypt/decrypt) using the AWS KMS Key used via `fs.s3a.server-side-encryption.key`.
+If not, then add permission(or IAM role) in "Key users" section by selecting the
+AWS-KMS CMK Key on AWS console.
+
 ### <a name="not_all_bytes_were_read"></a> Message appears in logs "Not all bytes were read from the S3ObjectInputStream"
 
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
index 4e5c452..2429191 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.store.audit.AuditSpanSource;
 import org.apache.hadoop.io.IOUtils;
 
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -259,4 +260,12 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
 
     return source.createSpan(getMethodName(), null, null);
   }
+
+  /**
+   *  Method to assume that S3 client side encryption is disabled on a test.
+   */
+  public void skipIfClientSideEncryption() {
+    Assume.assumeTrue("Skipping test if CSE is enabled",
+        !getFileSystem().isCSEEnabled());
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
index f9cfc04..4fc03e0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
@@ -51,7 +51,7 @@ public final class EncryptionTestUtils {
    */
   public static String convertKeyToMd5(FileSystem fs) {
     String base64Key = fs.getConf().getTrimmed(
-            SERVER_SIDE_ENCRYPTION_KEY
+        SERVER_SIDE_ENCRYPTION_KEY
     );
     byte[] key = Base64.decodeBase64(base64Key);
     byte[] md5 =  DigestUtils.md5(key);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
new file mode 100644
index 0000000..880f032
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+  private static final int SMALL_FILE_SIZE = 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFileSize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    maybeSkipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(SMALL_FILE_SIZE, 'a', 'z');
+    writeDataset(fs, src, data, data.length, SMALL_FILE_SIZE,
+        true, false);
+
+    ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  /**
+   * Test to verify if we get same content length of files in S3 CSE using
+   * listStatus and listFiles on the parent directory.
+   */
+  @Test
+  public void testDirectoryListingFileLengths() throws IOException {
+    describe("Test to verify directory listing calls gives correct content "
+        + "lengths");
+    maybeSkipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path parentDir = path(getMethodName());
+
+    // Creating files in the parent directory that will be used to assert
+    // content length.
+    for (int i : SIZES) {
+      Path child = new Path(parentDir, getMethodName() + i);
+      writeThenReadFile(child, i);
+    }
+
+    // Getting the content lengths of files inside the directory via FileStatus.
+    List<Integer> fileLengthDirListing = new ArrayList<>();
+    for (FileStatus fileStatus : fs.listStatus(parentDir)) {
+      fileLengthDirListing.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // ListStatus.
+    Assertions.assertThat(fileLengthDirListing)
+        .describedAs("File lengths aren't the same "
+            + "as expected from FileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+    // Getting the content lengths of files inside the directory via ListFiles.
+    RemoteIterator<LocatedFileStatus> listDir = fs.listFiles(parentDir, true);
+    List<Integer> fileLengthListLocated = new ArrayList<>();
+    while (listDir.hasNext()) {
+      LocatedFileStatus fileStatus = listDir.next();
+      fileLengthListLocated.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // LocatedFileStatus.
+    Assertions.assertThat(fileLengthListLocated)
+        .describedAs("File lengths isn't same "
+            + "as expected from LocatedFileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+  }
+
+  /**
+   * Test to verify multipart upload through S3ABlockOutputStream and
+   * verifying the contents of the uploaded file.
+   */
+  @Test
+  public void testBigFilePutAndGet() throws IOException {
+    maybeSkipTest();
+    assume("Scale test disabled: to enable set property " +
+        KEY_SCALE_TESTS_ENABLED, getTestPropertyBool(
+        getConfiguration(),
+        KEY_SCALE_TESTS_ENABLED,
+        DEFAULT_SCALE_TESTS_ENABLED));
+    S3AFileSystem fs = getFileSystem();
+    Path filePath = path(getMethodName());
+    byte[] fileContent = dataset(BIG_FILE_SIZE, 'a', 26);
+    int offsetSeek = fileContent[BIG_FILE_SIZE - 4];
+
+    // PUT a 15MB file using CSE to force multipart in CSE.
+    createFile(fs, filePath, true, fileContent);
+    LOG.info("Multi-part upload successful...");
+
+    try (FSDataInputStream in = fs.open(filePath)) {
+      // Verify random IO.
+      in.seek(BIG_FILE_SIZE - 4);
+      assertEquals("Byte at a specific position not equal to actual byte",
+          offsetSeek, in.read());
+      in.seek(0);
+      assertEquals("Byte at a specific position not equal to actual byte",
+          'a', in.read());
+
+      // Verify seek-read between two multipart blocks.
+      in.seek(MULTIPART_MIN_SIZE - 1);
+      int byteBeforeBlockEnd = fileContent[MULTIPART_MIN_SIZE];
+      assertEquals("Byte before multipart block end mismatch",
+          byteBeforeBlockEnd - 1, in.read());
+      assertEquals("Byte at multipart end mismatch",
+          byteBeforeBlockEnd, in.read());
+      assertEquals("Byte after multipart end mismatch",
+          byteBeforeBlockEnd + 1, in.read());
+
+      // Verify end of file seek read.
+      in.seek(BIG_FILE_SIZE + 1);
+      assertEquals("Byte at eof mismatch",
+          -1, in.read());
+
+      // Verify full read.
+      in.readFully(0, fileContent);
+      verifyFileContents(fs, filePath, fileContent);
+    }
+  }
+
+  /**
+   * Testing how unencrypted and encrypted data behaves when read through
+   * CSE enabled and disabled FS respectively.
+   */
+  @Test
+  public void testEncryptionEnabledAndDisabledFS() throws Exception {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();
+    Configuration cseDisabledConf = getConfiguration();
+    S3AFileSystem cseEnabledFS = getFileSystem();
+    Path unEncryptedFilePath = path(getMethodName());
+    Path encryptedFilePath = path(getMethodName() + "cse");
+
+    // Initialize a CSE disabled FS.
+    cseDisabledConf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    cseDisabledConf.unset(SERVER_SIDE_ENCRYPTION_KEY);
+    cseDisabledFS.initialize(getFileSystem().getUri(),
+        cseDisabledConf);
+
+    // Verifying both FS instances using an IOStat gauge.
+    IOStatistics cseDisabledIOStats = cseDisabledFS.getIOStatistics();
+    IOStatistics cseEnabledIOStatistics = cseEnabledFS.getIOStatistics();
+    IOStatisticAssertions.assertThatStatisticGauge(cseDisabledIOStats,
+        Statistic.CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol()).isEqualTo(0L);
+    IOStatisticAssertions.assertThatStatisticGauge(cseEnabledIOStatistics,
+        Statistic.CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol()).isEqualTo(1L);
+
+    // Unencrypted data written to a path.
+    try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFilePath)) {
+      out.write(new byte[SMALL_FILE_SIZE]);
+    }
+
+    // CSE enabled FS trying to read unencrypted data would face an exception.
+    try (FSDataInputStream in = cseEnabledFS.open(unEncryptedFilePath)) {
+      FileStatus encryptedFSFileStatus =
+          cseEnabledFS.getFileStatus(unEncryptedFilePath);
+      assertEquals("Mismatch in content length bytes", SMALL_FILE_SIZE,
+          encryptedFSFileStatus.getLen());
+
+      intercept(SecurityException.class, "",
+          "SecurityException should be thrown",
+          () -> {
+            in.read(new byte[SMALL_FILE_SIZE]);
+            return "Exception should be raised if unencrypted data is read by "
+                + "a CSE enabled FS";
+          });
+    }
+
+    // Encrypted data written to a path.
+    try (FSDataOutputStream out = cseEnabledFS.create(encryptedFilePath)) {
+      out.write('a');
+    }
+
+    // CSE disabled FS tries to read encrypted data.
+    try (FSDataInputStream in = cseDisabledFS.open(encryptedFilePath)) {
+      FileStatus unEncryptedFSFileStatus =
+          cseDisabledFS.getFileStatus(encryptedFilePath);
+      // Due to padding and encryption, content written and length shouldn't be
+      // equal to what a CSE disabled FS would read.
+      assertNotEquals("Mismatch in content length", 1,
+          unEncryptedFSFileStatus.getLen());
+      Assertions.assertThat(in.read())
+          .describedAs("Encrypted data shouldn't be equal to actual content "
+              + "without deciphering")
+          .isNotEqualTo('a');
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, Constants.MULTIPART_SIZE,
+        Constants.MIN_MULTIPART_THRESHOLD);
+    // To force multi part put and get in small files, we'll set the
+    // threshold and part size to 5MB.
+    conf.set(Constants.MULTIPART_SIZE,
+        String.valueOf(MULTIPART_MIN_SIZE));
+    conf.set(Constants.MIN_MULTIPART_THRESHOLD,
+        String.valueOf(MULTIPART_MIN_SIZE));
+    return conf;
+  }
+
+  /**
+   * Method to validate CSE for different file sizes.
+   *
+   * @param len length of the file.
+   */
+  protected void validateEncryptionForFileSize(int len) throws IOException {
+    maybeSkipTest();
+    describe("Create an encrypted file of size " + len);
+    // Creating a unique path by adding file length in file name.
+    Path path = writeThenReadFile(getMethodName() + len, len);
+    assertEncrypted(path);
+    rm(getFileSystem(), path, false, false);
+  }
+
+  /**
+   * Skip tests if certain conditions are met.
+   */
+  protected abstract void maybeSkipTest();
+
+  /**
+   * Assert that at path references an encrypted blob.
+   *
+   * @param path path
+   * @throws IOException on a failure
+   */
+  protected abstract void assertEncrypted(Path path) throws IOException;
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
new file mode 100644
index 0000000..35f648f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.amazonaws.services.s3.Headers;
+import org.assertj.core.api.Assertions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfKmsKeyIdIsNotSet;
+
+/**
+ * Testing the S3 CSE - KMS method.
+ */
+public class ITestS3AClientSideEncryptionKms
+    extends ITestS3AClientSideEncryption {
+
+  private static final String KMS_KEY_WRAP_ALGO = "kms+context";
+  private static final String KMS_CONTENT_ENCRYPTION_ALGO = "AES/GCM/NoPadding";
+
+  /**
+   * Creating custom configs for KMS testing.
+   *
+   * @return Configuration.
+   */
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.disableFilesystemCaching(conf);
+    return conf;
+  }
+
+  @Override
+  protected void maybeSkipTest() {
+    skipIfEncryptionTestsDisabled(getConfiguration());
+    skipIfKmsKeyIdIsNotSet(getConfiguration());
+  }
+
+  @Override
+  protected void assertEncrypted(Path path) throws IOException {
+    Map<String, byte[]> fsXAttrs = getFileSystem().getXAttrs(path);
+    String xAttrPrefix = "header.";
+
+    // Assert KeyWrap Algo
+    assertEquals("Key wrap algo isn't same as expected", KMS_KEY_WRAP_ALGO,
+        processHeader(fsXAttrs,
+            xAttrPrefix + Headers.CRYPTO_KEYWRAP_ALGORITHM));
+
+    // Assert content encryption algo for KMS, is present in the
+    // materials description and KMS key ID isn't.
+    String keyId =
+        getConfiguration().get(Constants.SERVER_SIDE_ENCRYPTION_KEY);
+    Assertions.assertThat(processHeader(fsXAttrs,
+        xAttrPrefix + Headers.MATERIALS_DESCRIPTION))
+        .describedAs("Materials Description should contain the content "
+            + "encryption algo and should not contain the KMS keyID.")
+        .contains(KMS_CONTENT_ENCRYPTION_ALGO)
+        .doesNotContain(keyId);
+  }
+
+  /**
+   * A method to process a FS xAttribute Header key by decoding it.
+   *
+   * @param fsXAttrs  Map of String(Key) and bytes[](Value) to represent fs
+   *                  xAttr.
+   * @param headerKey Key value of the header we are trying to process.
+   * @return String representing the value of key header provided.
+   */
+  private String processHeader(Map<String, byte[]> fsXAttrs,
+      String headerKey) {
+    return HeaderProcessing.decodeBytes(fsXAttrs.get(headerKey));
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
index 67c403e..3a8cf7a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
 
@@ -38,9 +39,11 @@ public class ITestS3AEncryptionSSEKMSUserDefinedKey
     // get the KMS key for this test.
     Configuration c = new Configuration();
     String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey)){
-      skip(SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " +
-          SSE_KMS.getMethod());
+    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
+        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
+      skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
+          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
+          + "SSE-KMS");
     }
     Configuration conf = super.createConfiguration();
     conf.set(SERVER_SIDE_ENCRYPTION_KEY, kmsKey);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
index c5ef65f..c7a62a3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
@@ -57,9 +57,11 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
     S3AFileSystem fs = getFileSystem();
     Configuration c = fs.getConf();
     String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey)) {
+    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
+        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
       skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
-          SSE_KMS.getMethod());
+          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
+          + "SSE-KMS");
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
index b0e1a40..a49998d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
 import org.apache.hadoop.test.LambdaTestUtils;
 
 import org.junit.Assume;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.FileNotFoundException;
@@ -63,6 +64,14 @@ public class ITestS3AInconsistency extends AbstractS3ATestBase {
   /** By using a power of 2 for the initial time, the total is a shift left. */
   private static final int TOTAL_RETRY_DELAY = INITIAL_RETRY << RETRIES;
 
+  /**
+   * S3 Client side encryption when enabled should skip this test.
+   */
+  @Before
+  public void setUp() {
+    skipIfClientSideEncryption();
+  }
+
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index 496226c..daefa78 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -187,6 +187,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
    */
   private void assumeNoDefaultEncryption() throws IOException {
     try {
+      skipIfClientSideEncryption();
       Assume.assumeThat(getDefaultEncryption(), nullValue());
     } catch (AccessDeniedException e) {
       // if the user can't check the default encryption, assume that it is
@@ -254,7 +255,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
   }
 
   private S3AEncryptionMethods encryptionAlgorithm() {
-    return getFileSystem().getServerSideEncryptionAlgorithm();
+    return getFileSystem().getS3EncryptionAlgorithm();
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 09f66df..2475b54 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -70,13 +70,17 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     invoker = new Invoker(new S3ARetryPolicy(getConfiguration()),
         Invoker.NO_OP
     );
+    skipIfClientSideEncryption();
     Assume.assumeTrue("No metadata store in test filesystem",
         getFileSystem().hasMetadataStore());
   }
 
   @Override
   public void teardown() throws Exception {
-    clearInconsistency(getFileSystem());
+    if (getFileSystem()
+        .getAmazonS3Client() instanceof InconsistentAmazonS3Client) {
+      clearInconsistency(getFileSystem());
+    }
     super.teardown();
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 2c66243..945d3f0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -234,4 +234,17 @@ public interface S3ATestConstants {
    * every test case.
    */
   String DIRECTORY_MARKER_AUDIT = "fs.s3a.directory.marker.audit";
+
+  /**
+   * Constant bytes being written when Client side encryption KMS is enabled
+   * for a test. This bytes written takes into account "EncryptionContext",
+   * which contains the algo used for eg:
+   * "aws:x-amz-cek-alg":"AES/GCM/NoPadding" , and "KeySpec", which specifies
+   * the length of data key. for eg: AES_256 to generate a 256-bit symmetric
+   * key.
+   *
+   * For test using bytesWritten as an assertion this constant value can be
+   * used.
+   */
+  int KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN = 94;
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 9f945e4..6c51c73 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -1495,4 +1495,16 @@ public final class S3ATestUtils {
         probes);
   }
 
+  /**
+   * Skip a test if CSE KMS key id is not set.
+   *
+   * @param configuration configuration to probe.
+   */
+  public static void skipIfKmsKeyIdIsNotSet(Configuration configuration) {
+    if (configuration.get(
+        SERVER_SIDE_ENCRYPTION_KEY) == null) {
+      skip("AWS KMS key id is not set");
+    }
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
index beeb667..a1e56c3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
@@ -96,7 +96,7 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
         fs.getBucket(),
         path,
         fs.pathToKey(path),
-        fs.getServerSideEncryptionAlgorithm(),
+        fs.getS3EncryptionAlgorithm(),
         new EncryptionSecrets().getEncryptionKey(),
         eTag,
         versionId,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
index a664a8b..273cf8b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
@@ -86,14 +86,14 @@ public class TestSSEConfiguration extends Assert {
   public void testSSEEmptyKey() {
     // test the internal logic of the test setup code
     Configuration c = buildConf(SSE_C.getMethod(), "");
-    assertEquals("", getServerSideEncryptionKey(BUCKET, c));
+    assertEquals("", getS3EncryptionKey(BUCKET, c));
   }
 
   @Test
   public void testSSEKeyNull() throws Throwable {
     // test the internal logic of the test setup code
     final Configuration c = buildConf(SSE_C.getMethod(), null);
-    assertEquals("", getServerSideEncryptionKey(BUCKET, c));
+    assertEquals("", getS3EncryptionKey(BUCKET, c));
 
     intercept(IOException.class, SSE_C_NO_KEY_ERROR,
         () -> getEncryptionAlgorithm(BUCKET, c));
@@ -109,7 +109,7 @@ public class TestSSEConfiguration extends Assert {
     // provider provisioned value instead.
     conf.set(SERVER_SIDE_ENCRYPTION_KEY, "keyInConfObject");
 
-    String sseKey = getServerSideEncryptionKey(BUCKET, conf);
+    String sseKey = getS3EncryptionKey(BUCKET, conf);
     assertNotNull("Proxy password should not retrun null.", sseKey);
     assertEquals("Proxy password override did NOT work.", key, sseKey);
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
index 72af175..22ce1e9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.SignableRequest;
+import com.amazonaws.auth.AWS4Signer;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.Signer;
 import com.amazonaws.services.s3.internal.AWSS3V4Signer;
@@ -169,6 +170,20 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
       LOG.info("Creating Signer #{}", c);
     }
 
+    /**
+     * Method to sign the incoming request with credentials.
+     *
+     * NOTE: In case of Client-side encryption, we do a "Generate Key" POST
+     * request to AWSKMS service rather than S3, this was causing the test to
+     * break. When this request happens, we have the endpoint in form of
+     * "kms.[REGION].amazonaws.com", and bucket-name becomes "kms". We can't
+     * use AWSS3V4Signer for AWSKMS service as it contains a header
+     * "x-amz-content-sha256:UNSIGNED-PAYLOAD", which returns a 400 bad
+     * request because the signature calculated by the service doesn't match
+     * what we sent.
+     * @param request the request to sign.
+     * @param credentials credentials used to sign the request.
+     */
     @Override
     public void sign(SignableRequest<?> request, AWSCredentials credentials) {
       int c = INVOCATION_COUNT.incrementAndGet();
@@ -182,10 +197,21 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
       } catch (IOException e) {
         throw new RuntimeException("Failed to get current Ugi", e);
       }
-      AWSS3V4Signer realSigner = new AWSS3V4Signer();
-      realSigner.setServiceName("s3");
-      realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
-      realSigner.sign(request, credentials);
+      if (bucketName.equals("kms")) {
+        AWS4Signer realKMSSigner = new AWS4Signer();
+        realKMSSigner.setServiceName("kms");
+        if (lastStoreValue != null) {
+          realKMSSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
+        }
+        realKMSSigner.sign(request, credentials);
+      } else {
+        AWSS3V4Signer realSigner = new AWSS3V4Signer();
+        realSigner.setServiceName("s3");
+        if (lastStoreValue != null) {
+          realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
+        }
+        realSigner.sign(request, credentials);
+      }
     }
 
     public static int getInstantiationCount() {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
index 34e355b..126f8b3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
@@ -141,6 +141,10 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
     // disable if assume role opts are off
     assumeSessionTestsEnabled(conf);
     disableFilesystemCaching(conf);
+    String s3EncryptionMethod =
+        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM,
+            S3AEncryptionMethods.SSE_KMS.getMethod());
+    String s3EncryptionKey = conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY, "");
     removeBaseAndBucketOverrides(conf,
         DELEGATION_TOKEN_BINDING,
         SERVER_SIDE_ENCRYPTION_ALGORITHM,
@@ -149,10 +153,11 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
         UserGroupInformation.AuthenticationMethod.KERBEROS.name());
     enableDelegationTokens(conf, getDelegationBinding());
     conf.set(AWS_CREDENTIALS_PROVIDER, " ");
-    // switch to SSE_S3.
+    // switch to CSE-KMS(if specified) else SSE-KMS.
     if (conf.getBoolean(KEY_ENCRYPTION_TESTS, true)) {
-      conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
-          S3AEncryptionMethods.SSE_S3.getMethod());
+      conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, s3EncryptionMethod);
+      // KMS key ID a must if CSE-KMS is being tested.
+      conf.set(SERVER_SIDE_ENCRYPTION_KEY, s3EncryptionKey);
     }
     // set the YARN RM up for YARN tests.
     conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
@@ -361,10 +366,10 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
       assertBoundToDT(delegatedFS, tokenKind);
       if (encryptionTestEnabled()) {
         assertNotNull("Encryption propagation failed",
-            delegatedFS.getServerSideEncryptionAlgorithm());
+            delegatedFS.getS3EncryptionAlgorithm());
         assertEquals("Encryption propagation failed",
-            fs.getServerSideEncryptionAlgorithm(),
-            delegatedFS.getServerSideEncryptionAlgorithm());
+            fs.getS3EncryptionAlgorithm(),
+            delegatedFS.getS3EncryptionAlgorithm());
       }
       verifyRestrictedPermissions(delegatedFS);
 
@@ -396,10 +401,10 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
       assertBoundToDT(secondDelegate, tokenKind);
       if (encryptionTestEnabled()) {
         assertNotNull("Encryption propagation failed",
-            secondDelegate.getServerSideEncryptionAlgorithm());
+            secondDelegate.getS3EncryptionAlgorithm());
         assertEquals("Encryption propagation failed",
-            fs.getServerSideEncryptionAlgorithm(),
-            secondDelegate.getServerSideEncryptionAlgorithm());
+            fs.getS3EncryptionAlgorithm(),
+            secondDelegate.getS3EncryptionAlgorithm());
       }
       ContractTestUtils.assertDeleted(secondDelegate, testPath, true);
       assertNotNull("unbounded DT",
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index faef79c..012b6b9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -25,7 +25,6 @@ import java.util.stream.Collectors;
 
 import com.amazonaws.services.s3.AmazonS3;
 import org.assertj.core.api.Assertions;
-import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -179,11 +178,9 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
     if (useInconsistentClient()) {
       AmazonS3 client = getFileSystem()
           .getAmazonS3ClientForTesting("fault injection");
-      Assert.assertTrue(
-          "AWS client is not inconsistent, even though the test requirees it "
-          + client,
-          client instanceof InconsistentAmazonS3Client);
-      inconsistentClient = (InconsistentAmazonS3Client) client;
+      if (client instanceof InconsistentAmazonS3Client) {
+        inconsistentClient = (InconsistentAmazonS3Client) client;
+      }
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 5dc8be0..20ffd0f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -15,6 +15,7 @@ package org.apache.hadoop.fs.s3a.fileContext;
 
 import java.net.URI;
 
+import com.amazonaws.services.s3.model.CryptoStorageMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,6 +24,7 @@ import org.apache.hadoop.fs.FCStatisticsBaseTest;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
 
@@ -30,6 +32,11 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
 /**
  * S3a implementation of FCStatisticsBaseTest.
  */
@@ -39,10 +46,11 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
       LoggerFactory.getLogger(STSClientFactory.class);
 
   private Path testRootPath;
+  private Configuration conf;
 
   @Before
   public void setUp() throws Exception {
-    Configuration conf = new Configuration();
+    conf = new Configuration();
     fc = S3ATestUtils.createTestFileContext(conf);
     testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
     fc.mkdir(testRootPath,
@@ -62,10 +70,31 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
     Assert.assertEquals(2 * blockSize, stats.getBytesRead());
   }
 
+  /**
+   * A method to verify the bytes written.
+   * <br>
+   * NOTE: if Client side encryption is enabled, expected bytes written
+   * should increase by 16(padding of data) + bytes for the key ID set + 94(KMS
+   * key generation) in case of storage type{@link CryptoStorageMode} as
+   * ObjectMetadata(Default). If Crypto Storage mode is instruction file then
+   * add additional bytes as that file is stored separately and would account
+   * for bytes written.
+   *
+   * @param stats Filesystem statistics.
+   */
   @Override
   protected void verifyWrittenBytes(FileSystem.Statistics stats) {
     //No extra bytes are written
-    Assert.assertEquals(blockSize, stats.getBytesWritten());
+    long expectedBlockSize = blockSize;
+    if (conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM, "")
+        .equals(S3AEncryptionMethods.CSE_KMS.getMethod())) {
+      String keyId = conf.get(SERVER_SIDE_ENCRYPTION_KEY, "");
+      // Adding padding length and KMS key generation bytes written.
+      expectedBlockSize += CSE_PADDING_LENGTH + keyId.getBytes().length +
+          KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN;
+    }
+    Assert.assertEquals("Mismatch in bytes written", expectedBlockSize,
+        stats.getBytesWritten());
   }
 
   @Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
index 9b63430..be51b2f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
@@ -25,9 +25,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
 
@@ -43,9 +45,11 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
   public void setup() throws Exception {
     Configuration c = new Configuration();
     String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey)) {
+    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
+        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
       skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
-              SSE_KMS.getMethod());
+          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
+          + "SSE-KMS");
     }
     super.setup();
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
index a8635ea..a1c5c3f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
@@ -49,7 +49,7 @@ public class ITestS3AHugeFilesSSECDiskBlocks
   protected Configuration createScaleConfiguration() {
     Configuration conf = super.createScaleConfiguration();
     removeBaseAndBucketOverrides(conf, SERVER_SIDE_ENCRYPTION_KEY,
-            SERVER_SIDE_ENCRYPTION_ALGORITHM);
+           SERVER_SIDE_ENCRYPTION_ALGORITHM);
     S3ATestUtils.disableFilesystemCaching(conf);
     conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
         getSSEAlgorithm().getMethod());
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index 4d1096f..27798e2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -242,6 +242,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
 
   @Test
   public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
+    skipIfClientSideEncryption();
     requireCSVTestData();
     int blockSize = _1MB;
     describe("Open the test file %s and read it in blocks of size %d",
@@ -348,6 +349,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
   @Test
   public void testReadWithNormalPolicy() throws Throwable {
     describe("Read big blocks with a big readahead");
+    skipIfClientSideEncryption();
     executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2,
         S3AInputPolicy.Normal);
     assertStreamOpenedExactlyOnce();
@@ -356,6 +358,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
   @Test
   public void testDecompressionSequential128K() throws Throwable {
     describe("Decompress with a 128K readahead");
+    skipIfClientSideEncryption();
     executeDecompression(128 * _1KB, S3AInputPolicy.Sequential);
     assertStreamOpenedExactlyOnce();
   }
@@ -458,6 +461,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
 
   @Test
   public void testRandomIORandomPolicy() throws Throwable {
+    skipIfClientSideEncryption();
     executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length);
     assertEquals("streams aborted in " + streamStatistics,
         0, streamStatistics.getAborted());
@@ -465,6 +469,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
 
   @Test
   public void testRandomIONormalPolicy() throws Throwable {
+    skipIfClientSideEncryption();
     long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
     executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
     assertEquals("streams aborted in " + streamStatistics,

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org