You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/10/05 10:46:26 UTC

[hadoop] 01/04: HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK (#2706)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit aee975a136a07cd25977a453977c8c63b0c070dc
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Tue Jul 27 15:38:51 2021 +0530

    HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK (#2706)
    
    This (big!) patch adds support for client side encryption in AWS S3,
    with keys managed by AWS-KMS.
    
    Read the documentation in encryption.md very, very carefully before
    use and consider it unstable.
    
    S3-CSE is enabled in the existing configuration option
    "fs.s3a.server-side-encryption-algorithm":
    
    fs.s3a.server-side-encryption-algorithm=CSE-KMS
    fs.s3a.server-side-encryption.key=<KMS_KEY_ID>
    
    You cannot enable CSE and SSE in the same client, although
    you can still enable a default SSE option in the S3 console.
    
    * Filesystem list/get status operations subtract 16 bytes from the length
      of all files >= 16 bytes long to compensate for the padding which CSE
      adds.
    * The SDK always warns about the specific algorithm chosen being
      deprecated. It is critical to use this algorithm for ranged
      GET requests to work (i.e. random IO). Ignore.
    * Unencrypted files CANNOT BE READ.
      The entire bucket SHOULD be encrypted with S3-CSE.
    * Uploading files may be a bit slower as blocks are now
      written sequentially.
    * The Multipart Upload API is disabled when S3-CSE is active.
    
    Contributed by Mehakmeet Singh
    
    Change-Id: Ie1a27a036a39db66a67e9c6d33bc78d54ea708a0
---
 .../AbstractContractMultipartUploaderTest.java     |   3 +
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   7 +-
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      | 135 +++++++--
 .../java/org/apache/hadoop/fs/s3a/Listing.java     |   9 +-
 .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java |  43 ++-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  83 +++++-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java    |  46 ++--
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |   9 +-
 .../hadoop/fs/s3a/impl/HeaderProcessing.java       |  11 +-
 .../hadoop/fs/s3a/impl/InternalConstants.java      |   8 +
 .../apache/hadoop/fs/s3a/impl/RenameOperation.java |   2 +-
 .../apache/hadoop/fs/s3a/impl/StoreContext.java    |  15 +-
 .../hadoop/fs/s3a/impl/StoreContextBuilder.java    |  16 +-
 .../site/markdown/tools/hadoop-aws/encryption.md   | 233 +++++++++++++++-
 .../tools/hadoop-aws/troubleshooting_s3a.md        | 278 +++++++++++++++++++
 .../apache/hadoop/fs/s3a/AbstractS3ATestBase.java  |   9 +
 .../apache/hadoop/fs/s3a/EncryptionTestUtils.java  |   2 +-
 .../fs/s3a/ITestS3AClientSideEncryption.java       | 301 +++++++++++++++++++++
 .../fs/s3a/ITestS3AClientSideEncryptionKms.java    |  95 +++++++
 .../ITestS3AEncryptionSSEKMSUserDefinedKey.java    |   9 +-
 .../ITestS3AEncryptionWithDefaultS3Settings.java   |   6 +-
 .../hadoop/fs/s3a/ITestS3AInconsistency.java       |   9 +
 .../hadoop/fs/s3a/ITestS3AMiscOperations.java      |   3 +-
 .../hadoop/fs/s3a/ITestS3GuardListConsistency.java |   6 +-
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  13 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  12 +
 .../hadoop/fs/s3a/TestS3AInputStreamRetry.java     |   2 +-
 .../apache/hadoop/fs/s3a/TestSSEConfiguration.java |   6 +-
 .../hadoop/fs/s3a/auth/ITestCustomSigner.java      |  34 ++-
 .../ITestSessionDelegationInFileystem.java         |  23 +-
 .../hadoop/fs/s3a/commit/AbstractCommitITest.java  |   9 +-
 .../fileContext/ITestS3AFileContextStatistics.java |  33 ++-
 .../fs/s3a/scale/ITestS3AHugeFilesEncryption.java  |   8 +-
 .../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java |   2 +-
 .../s3a/scale/ITestS3AInputStreamPerformance.java  |   5 +
 35 files changed, 1370 insertions(+), 115 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
index 90e12a8..3e754e4 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -89,6 +89,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
 
     final FileSystem fs = getFileSystem();
     Path testPath = getContract().getTestPath();
+    Assume.assumeTrue("Multipart uploader is not supported",
+        fs.hasPathCapability(testPath,
+            CommonPathCapabilities.FS_MULTIPART_UPLOADER));
     uploader0 = fs.createMultipartUploader(testPath).build();
     uploader1 = fs.createMultipartUploader(testPath).build();
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index e51d8ab..8bd0291 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -420,7 +420,12 @@ public final class Constants {
       "fs.s3a.multipart.purge.age";
   public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
 
-  // s3 server-side encryption, see S3AEncryptionMethods for valid options
+  /**
+   * s3 server-side encryption or s3 client side encryption method, see
+   * {@link S3AEncryptionMethods} for valid options.
+   *
+   * {@value}
+   */
   public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
       "fs.s3a.server-side-encryption-algorithm";
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 7dc920c..2abef63 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -25,13 +25,23 @@ import com.amazonaws.ClientConfiguration;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.handlers.RequestHandler2;
+import com.amazonaws.regions.RegionUtils;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Builder;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.AmazonS3EncryptionClientV2Builder;
+import com.amazonaws.services.s3.AmazonS3EncryptionV2;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.amazonaws.services.s3.internal.ServiceUtils;
+import com.amazonaws.services.s3.model.CryptoConfigurationV2;
+import com.amazonaws.services.s3.model.CryptoMode;
+import com.amazonaws.services.s3.model.CryptoRangeGetMode;
+import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
+import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider;
 import com.amazonaws.util.AwsHostNameUtils;
 import com.amazonaws.util.RuntimeHttpUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +58,8 @@ import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
 
 /**
@@ -112,9 +124,17 @@ public class DefaultS3ClientFactory extends Configured
     }
 
     try {
-      return buildAmazonS3Client(
-          awsConf,
-          parameters);
+      if (S3AEncryptionMethods.getMethod(S3AUtils.
+          lookupPassword(conf, SERVER_SIDE_ENCRYPTION_ALGORITHM, null))
+          .equals(S3AEncryptionMethods.CSE_KMS)) {
+        return buildAmazonS3EncryptionClient(
+            awsConf,
+            parameters);
+      } else {
+        return buildAmazonS3Client(
+            awsConf,
+            parameters);
+      }
     } catch (SdkClientException e) {
       // SDK refused to build.
       throw translateException("creating AWS S3 client", uri.toString(), e);
@@ -122,6 +142,60 @@ public class DefaultS3ClientFactory extends Configured
   }
 
   /**
+   * Create an {@link AmazonS3} client of type
+   * {@link AmazonS3EncryptionV2} if CSE is enabled.
+   *
+   * @param awsConf    AWS configuration.
+   * @param parameters parameters.
+   *
+   * @return new AmazonS3 client.
+   * @throws IOException if lookupPassword() has any problem.
+   */
+  protected AmazonS3 buildAmazonS3EncryptionClient(
+      final ClientConfiguration awsConf,
+      final S3ClientCreationParameters parameters) throws IOException {
+
+    AmazonS3 client;
+    AmazonS3EncryptionClientV2Builder builder =
+        new AmazonS3EncryptionClientV2Builder();
+    Configuration conf = getConf();
+
+    //CSE-KMS Method
+    String kmsKeyId = S3AUtils.lookupPassword(conf,
+        SERVER_SIDE_ENCRYPTION_KEY, null);
+    // Check if kmsKeyID is not null
+    Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
+        + "requires KMS key ID. Use " + SERVER_SIDE_ENCRYPTION_KEY
+        + " property to set it. ");
+
+    EncryptionMaterialsProvider materialsProvider =
+        new KMSEncryptionMaterialsProvider(kmsKeyId);
+    builder.withEncryptionMaterialsProvider(materialsProvider);
+    //Configure basic params of a S3 builder.
+    configureBasicParams(builder, awsConf, parameters);
+
+    // Configuring endpoint.
+    AmazonS3EncryptionClientV2Builder.EndpointConfiguration epr
+        = createEndpointConfiguration(parameters.getEndpoint(),
+        awsConf, getConf().getTrimmed(AWS_REGION));
+    configureEndpoint(builder, epr);
+
+    // Create cryptoConfig.
+    CryptoConfigurationV2 cryptoConfigurationV2 =
+        new CryptoConfigurationV2(CryptoMode.AuthenticatedEncryption)
+            .withRangeGetMode(CryptoRangeGetMode.ALL);
+    if (epr != null) {
+      cryptoConfigurationV2
+          .withAwsKmsRegion(RegionUtils.getRegion(epr.getSigningRegion()));
+      LOG.debug("KMS region used: {}", cryptoConfigurationV2.getAwsKmsRegion());
+    }
+    builder.withCryptoConfiguration(cryptoConfigurationV2);
+    client = builder.build();
+
+    return client;
+  }
+
+  /**
    * Use the Builder API to create an AWS S3 client.
    * <p>
    * This has a more complex endpoint configuration mechanism
@@ -137,33 +211,60 @@ public class DefaultS3ClientFactory extends Configured
       final ClientConfiguration awsConf,
       final S3ClientCreationParameters parameters) {
     AmazonS3ClientBuilder b = AmazonS3Client.builder();
-    b.withCredentials(parameters.getCredentialSet());
-    b.withClientConfiguration(awsConf);
-    b.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
+    configureBasicParams(b, awsConf, parameters);
+
+    // endpoint set up is a PITA
+    AwsClientBuilder.EndpointConfiguration epr
+        = createEndpointConfiguration(parameters.getEndpoint(),
+        awsConf, getConf().getTrimmed(AWS_REGION));
+    configureEndpoint(b, epr);
+    final AmazonS3 client = b.build();
+    return client;
+  }
+
+  /**
+   * A method to configure basic AmazonS3Builder parameters.
+   *
+   * @param builder    Instance of AmazonS3Builder used.
+   * @param awsConf    ClientConfiguration used.
+   * @param parameters Parameters used to set in the builder.
+   */
+  private void configureBasicParams(AmazonS3Builder builder,
+      ClientConfiguration awsConf, S3ClientCreationParameters parameters) {
+    builder.withCredentials(parameters.getCredentialSet());
+    builder.withClientConfiguration(awsConf);
+    builder.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
 
     if (parameters.getMetrics() != null) {
-      b.withMetricsCollector(
+      builder.withMetricsCollector(
           new AwsStatisticsCollector(parameters.getMetrics()));
     }
     if (parameters.getRequestHandlers() != null) {
-      b.withRequestHandlers(
+      builder.withRequestHandlers(
           parameters.getRequestHandlers().toArray(new RequestHandler2[0]));
     }
     if (parameters.getMonitoringListener() != null) {
-      b.withMonitoringListener(parameters.getMonitoringListener());
+      builder.withMonitoringListener(parameters.getMonitoringListener());
     }
 
-    // endpoint set up is a PITA
-    AwsClientBuilder.EndpointConfiguration epr
-        = createEndpointConfiguration(parameters.getEndpoint(),
-        awsConf, getConf().getTrimmed(AWS_REGION));
+  }
+
+  /**
+   * A method to configure endpoint and Region for an AmazonS3Builder.
+   *
+   * @param builder Instance of AmazonS3Builder used.
+   * @param epr     EndpointConfiguration used to set in builder.
+   */
+  private void configureEndpoint(
+      AmazonS3Builder builder,
+      AmazonS3Builder.EndpointConfiguration epr) {
     if (epr != null) {
       // an endpoint binding was constructed: use it.
-      b.withEndpointConfiguration(epr);
+      builder.withEndpointConfiguration(epr);
     } else {
       // no idea what the endpoint is, so tell the SDK
       // to work it out at the cost of an extra HEAD request
-      b.withForceGlobalBucketAccessEnabled(true);
+      builder.withForceGlobalBucketAccessEnabled(true);
       // HADOOP-17771 force set the region so the build process doesn't halt.
       String region = getConf().getTrimmed(AWS_REGION, AWS_S3_CENTRAL_REGION);
       LOG.debug("fs.s3a.endpoint.region=\"{}\"", region);
@@ -171,7 +272,7 @@ public class DefaultS3ClientFactory extends Configured
         // there's either an explicit region or we have fallen back
         // to the central one.
         LOG.debug("Using default endpoint; setting region to {}", region);
-        b.setRegion(region);
+        builder.setRegion(region);
       } else {
         // no region.
         // allow this if people really want it; it is OK to rely on this
@@ -180,8 +281,6 @@ public class DefaultS3ClientFactory extends Configured
         LOG.debug(SDK_REGION_CHAIN_IN_USE);
       }
     }
-    final AmazonS3 client = b.build();
-    return client;
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 113e6f4..5a4f551 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -90,6 +90,7 @@ import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFr
 public class Listing extends AbstractStoreOperation {
 
   private static final Logger LOG = S3AFileSystem.LOG;
+  private final boolean isCSEEnabled;
 
   static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
       new AcceptAllButS3nDirs();
@@ -97,9 +98,10 @@ public class Listing extends AbstractStoreOperation {
   private final ListingOperationCallbacks listingOperationCallbacks;
 
   public Listing(ListingOperationCallbacks listingOperationCallbacks,
-                 StoreContext storeContext) {
+      StoreContext storeContext) {
     super(storeContext);
     this.listingOperationCallbacks = listingOperationCallbacks;
+    this.isCSEEnabled = storeContext.isCSEEnabled();
   }
 
   /**
@@ -687,7 +689,7 @@ public class Listing extends AbstractStoreOperation {
           S3AFileStatus status = createFileStatus(keyPath, summary,
                   listingOperationCallbacks.getDefaultBlockSize(keyPath),
                   getStoreContext().getUsername(),
-              summary.getETag(), null);
+              summary.getETag(), null, isCSEEnabled);
           LOG.debug("Adding: {}", status);
           stats.add(status);
           added++;
@@ -961,7 +963,7 @@ public class Listing extends AbstractStoreOperation {
     public boolean accept(Path keyPath, S3ObjectSummary summary) {
       return !keyPath.equals(qualifiedPath)
           && !summary.getKey().endsWith(S3N_FOLDER_SUFFIX)
-          && !objectRepresentsDirectory(summary.getKey(), summary.getSize());
+          && !objectRepresentsDirectory(summary.getKey());
     }
 
     /**
@@ -1049,6 +1051,7 @@ public class Listing extends AbstractStoreOperation {
     }
   }
 
+  @SuppressWarnings("unchecked")
   public static RemoteIterator<LocatedFileStatus> toLocatedFileStatusIterator(
       RemoteIterator<? extends LocatedFileStatus> iterator) {
     return (RemoteIterator < LocatedFileStatus >) iterator;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 5ba39aa..ddb933d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -155,6 +155,9 @@ class S3ABlockOutputStream extends OutputStream implements
   private static final LogExactlyOnce WARN_ON_SYNCABLE =
       new LogExactlyOnce(LOG);
 
+  /** is client side encryption enabled? */
+  private final boolean isCSEEnabled;
+
   /**
    * An S3A output stream which uploads partitions in a separate pool of
    * threads; different {@link S3ADataBlocks.BlockFactory}
@@ -189,6 +192,7 @@ class S3ABlockOutputStream extends OutputStream implements
       LOG.debug("Put tracker requests multipart upload");
       initMultipartUpload();
     }
+    this.isCSEEnabled = builder.isCSEEnabled;
   }
 
   /**
@@ -307,29 +311,34 @@ class S3ABlockOutputStream extends OutputStream implements
       // of capacity
       // Trigger an upload then process the remainder.
       LOG.debug("writing more data than block has capacity -triggering upload");
-      uploadCurrentBlock();
+      uploadCurrentBlock(false);
       // tail recursion is mildly expensive, but given buffer sizes must be MB.
       // it's unlikely to recurse very deeply.
       this.write(source, offset + written, len - written);
     } else {
-      if (remainingCapacity == 0) {
+      if (remainingCapacity == 0 && !isCSEEnabled) {
         // the whole buffer is done, trigger an upload
-        uploadCurrentBlock();
+        uploadCurrentBlock(false);
       }
     }
   }
 
   /**
    * Start an asynchronous upload of the current block.
+   *
+   * @param isLast true, if part being uploaded is last and client side
+   *               encryption is enabled.
    * @throws IOException Problems opening the destination for upload,
-   * initializing the upload, or if a previous operation has failed.
+   *                     initializing the upload, or if a previous operation
+   *                     has failed.
    */
-  private synchronized void uploadCurrentBlock() throws IOException {
+  private synchronized void uploadCurrentBlock(boolean isLast)
+      throws IOException {
     Preconditions.checkState(hasActiveBlock(), "No active block");
     LOG.debug("Writing block # {}", blockCount);
     initMultipartUpload();
     try {
-      multiPartUpload.uploadBlockAsync(getActiveBlock());
+      multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast);
       bytesSubmitted += getActiveBlock().dataSize();
     } finally {
       // set the block to null, so the next write will create a new block.
@@ -389,8 +398,9 @@ class S3ABlockOutputStream extends OutputStream implements
         // PUT the final block
         if (hasBlock &&
             (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
-          //send last part
-          uploadCurrentBlock();
+          // send last part and set the value of isLastPart to true.
+          // Necessary to set this "true" in case of client side encryption.
+          uploadCurrentBlock(true);
         }
         // wait for the partial uploads to finish
         final List<PartETag> partETags =
@@ -760,7 +770,8 @@ class S3ABlockOutputStream extends OutputStream implements
      * @throws IOException upload failure
      * @throws PathIOException if too many blocks were written
      */
-    private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
+    private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
+        Boolean isLast)
         throws IOException {
       LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
       Preconditions.checkNotNull(uploadId, "Null uploadId");
@@ -781,6 +792,7 @@ class S3ABlockOutputStream extends OutputStream implements
             uploadData.getUploadStream(),
             uploadData.getFile(),
             0L);
+        request.setLastPart(isLast);
       } catch (SdkBaseException aws) {
         // catch and translate
         IOException e = translateException("upload", key, aws);
@@ -1042,6 +1054,9 @@ class S3ABlockOutputStream extends OutputStream implements
     /** Should Syncable calls be downgraded? */
     private boolean downgradeSyncableExceptions;
 
+    /** is Client side Encryption enabled? */
+    private boolean isCSEEnabled;
+
     private BlockOutputStreamBuilder() {
     }
 
@@ -1157,5 +1172,15 @@ class S3ABlockOutputStream extends OutputStream implements
       downgradeSyncableExceptions = value;
       return this;
     }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withCSEEnabled(boolean value) {
+      isCSEEnabled = value;
+      return this;
+    }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e0dfb56..5373c43 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -52,6 +52,7 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.SdkBaseException;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.Headers;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@@ -215,6 +216,7 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
@@ -359,6 +361,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private AuditManagerS3A auditManager =
       AuditIntegration.stubAuditManager();
 
+  /**
+   * Is this S3A FS instance using S3 client side encryption?
+   */
+  private boolean isCSEEnabled;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -421,12 +428,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // DT Bindings may override this
       setEncryptionSecrets(new EncryptionSecrets(
           getEncryptionAlgorithm(bucket, conf),
-          getServerSideEncryptionKey(bucket, getConf())));
+          getS3EncryptionKey(bucket, getConf())));
 
       invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
       instrumentation = new S3AInstrumentation(uri);
       initializeStatisticsBinding();
-
+      // If CSE-KMS method is set then CSE is enabled.
+      isCSEEnabled = S3AUtils.lookupPassword(conf,
+          SERVER_SIDE_ENCRYPTION_ALGORITHM, null) != null;
+      LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
+      setCSEGauge();
       // Username is the current user at the time the FS was instantiated.
       owner = UserGroupInformation.getCurrentUser();
       username = owner.getShortUserName();
@@ -515,6 +526,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
       blockOutputActiveBlocks = intOption(conf,
           FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
+      // If CSE is enabled, do multipart uploads serially.
+      if (isCSEEnabled) {
+        blockOutputActiveBlocks = 1;
+      }
       LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
               " queue limit={}",
           blockOutputBuffer, partSize, blockOutputActiveBlocks);
@@ -559,7 +574,22 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       stopAllServices();
       throw e;
     }
+  }
 
+  /**
+   * Set the client side encryption gauge to 0 or 1, indicating if CSE is
+   * enabled through the gauge or not.
+   */
+  private void setCSEGauge() {
+    IOStatisticsStore ioStatisticsStore =
+        (IOStatisticsStore) getIOStatistics();
+    if (isCSEEnabled) {
+      ioStatisticsStore
+          .setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 1L);
+    } else {
+      ioStatisticsStore
+          .setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 0L);
+    }
   }
 
   /**
@@ -1162,7 +1192,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * Get the encryption algorithm of this endpoint.
    * @return the encryption algorithm.
    */
-  public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
+  public S3AEncryptionMethods getS3EncryptionAlgorithm() {
     return encryptionSecrets.getEncryptionMethod();
   }
 
@@ -1503,7 +1533,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     return new S3ObjectAttributes(bucket,
         f,
         pathToKey(f),
-        getServerSideEncryptionAlgorithm(),
+        getS3EncryptionAlgorithm(),
         encryptionSecrets.getEncryptionKey(),
         eTag,
         versionId,
@@ -1628,7 +1658,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .withDowngradeSyncableExceptions(
             getConf().getBoolean(
                 DOWNGRADE_SYNCABLE_EXCEPTIONS,
-                DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT));
+                DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
+        .withCSEEnabled(isCSEEnabled);
     return new FSDataOutputStream(
         new S3ABlockOutputStream(builder),
         null);
@@ -3680,7 +3711,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         // look for the simple file
         ObjectMetadata meta = getObjectMetadata(key);
         LOG.debug("Found exact file: normal file {}", key);
-        return new S3AFileStatus(meta.getContentLength(),
+        long contentLength = meta.getContentLength();
+        // check if CSE is enabled, then strip padded length.
+        if (isCSEEnabled
+            && meta.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
+            && contentLength >= CSE_PADDING_LENGTH) {
+          contentLength -= CSE_PADDING_LENGTH;
+        }
+        return new S3AFileStatus(contentLength,
             dateToLong(meta.getLastModified()),
             path,
             getDefaultBlockSize(path),
@@ -4291,7 +4329,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         key, length, eTag, versionId);
     Path p = keyToQualifiedPath(key);
     Preconditions.checkArgument(length >= 0, "content length is negative");
-    final boolean isDir = objectRepresentsDirectory(key, length);
+    final boolean isDir = objectRepresentsDirectory(key);
     // kick off an async delete
     CompletableFuture<?> deletion;
     if (!keepDirectoryMarkers(p)) {
@@ -4471,9 +4509,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       sb.append(", blockSize=").append(getDefaultBlockSize());
     }
     sb.append(", multiPartThreshold=").append(multiPartThreshold);
-    if (getServerSideEncryptionAlgorithm() != null) {
-      sb.append(", serverSideEncryptionAlgorithm='")
-          .append(getServerSideEncryptionAlgorithm())
+    if (getS3EncryptionAlgorithm() != null) {
+      sb.append(", s3EncryptionAlgorithm='")
+          .append(getS3EncryptionAlgorithm())
           .append('\'');
     }
     if (blockFactory != null) {
@@ -4499,6 +4537,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           .append(getInstrumentation().toString())
           .append("}");
     }
+    sb.append(", ClientSideEncryption=").append(isCSEEnabled);
     sb.append('}');
     return sb.toString();
   }
@@ -5109,8 +5148,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       return isMagicCommitEnabled();
 
     case SelectConstants.S3_SELECT_CAPABILITY:
-      // select is only supported if enabled
-      return SelectBinding.isSelectEnabled(getConf());
+      // select is only supported if enabled and client side encryption is
+      // disabled.
+      return !isCSEEnabled && SelectBinding.isSelectEnabled(getConf());
 
     case CommonPathCapabilities.FS_CHECKSUMS:
       // capability depends on FS configuration
@@ -5118,8 +5158,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           ETAG_CHECKSUM_ENABLED_DEFAULT);
 
     case CommonPathCapabilities.ABORTABLE_STREAM:
-    case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
       return true;
+    case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
+      // client side encryption doesn't support multipart uploader.
+      return !isCSEEnabled;
 
     // this client is safe to use with buckets
     // containing directory markers anywhere in
@@ -5255,7 +5297,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   private void requireSelectSupport(final Path source) throws
       UnsupportedOperationException {
-    if (!SelectBinding.isSelectEnabled(getConf())) {
+    if (!isCSEEnabled && !SelectBinding.isSelectEnabled(getConf())) {
 
       throw new UnsupportedOperationException(
           SelectConstants.SELECT_UNSUPPORTED);
@@ -5378,6 +5420,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   public S3AMultipartUploaderBuilder createMultipartUploader(
       final Path basePath)
       throws IOException {
+    if(isCSEEnabled) {
+      throw new UnsupportedOperationException("Multi-part uploader not "
+          + "supported for Client side encryption.");
+    }
     final Path path = makeQualified(basePath);
     try (AuditSpan span = entryPoint(MULTIPART_UPLOAD_INSTANTIATED, path)) {
       StoreContext ctx = createStoreContext();
@@ -5416,6 +5462,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .setContextAccessors(new ContextAccessorsImpl())
         .setTimeProvider(getTtlTimeProvider())
         .setAuditor(getAuditor())
+        .setEnableCSE(isCSEEnabled)
         .build();
   }
 
@@ -5485,4 +5532,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       return S3AFileSystem.this.getRequestFactory();
     }
   }
+
+  /**
+   * a method to know if Client side encryption is enabled or not.
+   * @return a boolean stating if CSE is enabled.
+   */
+  public boolean isCSEEnabled() {
+    return isCSEEnabled;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 8a181f2..29fbac3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -87,6 +87,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
 import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.translateDeleteException;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -521,6 +522,7 @@ public final class S3AUtils {
    * @param owner owner of the file
    * @param eTag S3 object eTag or null if unavailable
    * @param versionId S3 object versionId or null if unavailable
+   * @param isCSEEnabled is client side encryption enabled?
    * @return a status entry
    */
   public static S3AFileStatus createFileStatus(Path keyPath,
@@ -528,10 +530,15 @@ public final class S3AUtils {
       long blockSize,
       String owner,
       String eTag,
-      String versionId) {
+      String versionId,
+      boolean isCSEEnabled) {
     long size = summary.getSize();
+    // check if cse is enabled; strip out constant padding length.
+    if (isCSEEnabled && size >= CSE_PADDING_LENGTH) {
+      size -= CSE_PADDING_LENGTH;
+    }
     return createFileStatus(keyPath,
-        objectRepresentsDirectory(summary.getKey(), size),
+        objectRepresentsDirectory(summary.getKey()),
         size, summary.getLastModified(), blockSize, owner, eTag, versionId);
   }
 
@@ -572,14 +579,11 @@ public final class S3AUtils {
   /**
    * Predicate: does the object represent a directory?.
    * @param name object name
-   * @param size object size
    * @return true if it meets the criteria for being an object
    */
-  public static boolean objectRepresentsDirectory(final String name,
-      final long size) {
+  public static boolean objectRepresentsDirectory(final String name) {
     return !name.isEmpty()
-        && name.charAt(name.length() - 1) == '/'
-        && size == 0L;
+        && name.charAt(name.length() - 1) == '/';
   }
 
   /**
@@ -1564,7 +1568,7 @@ public final class S3AUtils {
   }
 
   /**
-   * Get any SSE key from a configuration/credential provider.
+   * Get any SSE/CSE key from a configuration/credential provider.
    * This operation handles the case where the option has been
    * set in the provider or configuration to the option
    * {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
@@ -1574,7 +1578,7 @@ public final class S3AUtils {
    * @return the encryption key or ""
    * @throws IllegalArgumentException bad arguments.
    */
-  public static String getServerSideEncryptionKey(String bucket,
+  public static String getS3EncryptionKey(String bucket,
       Configuration conf) {
     try {
       return lookupPassword(bucket, conf, SERVER_SIDE_ENCRYPTION_KEY);
@@ -1585,7 +1589,7 @@ public final class S3AUtils {
   }
 
   /**
-   * Get the server-side encryption algorithm.
+   * Get the server-side encryption or client side encryption algorithm.
    * This includes validation of the configuration, checking the state of
    * the encryption key given the chosen algorithm.
    *
@@ -1597,22 +1601,23 @@ public final class S3AUtils {
    */
   public static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
       Configuration conf) throws IOException {
-    S3AEncryptionMethods sse = S3AEncryptionMethods.getMethod(
+    S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(
         lookupPassword(bucket, conf,
             SERVER_SIDE_ENCRYPTION_ALGORITHM));
-    String sseKey = getServerSideEncryptionKey(bucket, conf);
-    int sseKeyLen = StringUtils.isBlank(sseKey) ? 0 : sseKey.length();
-    String diagnostics = passwordDiagnostics(sseKey, "key");
-    switch (sse) {
+    String encryptionKey = getS3EncryptionKey(bucket, conf);
+    int encryptionKeyLen =
+        StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
+    String diagnostics = passwordDiagnostics(encryptionKey, "key");
+    switch (encryptionMethod) {
     case SSE_C:
       LOG.debug("Using SSE-C with {}", diagnostics);
-      if (sseKeyLen == 0) {
+      if (encryptionKeyLen == 0) {
         throw new IOException(SSE_C_NO_KEY_ERROR);
       }
       break;
 
     case SSE_S3:
-      if (sseKeyLen != 0) {
+      if (encryptionKeyLen != 0) {
         throw new IOException(SSE_S3_WITH_KEY_ERROR
             + " (" + diagnostics + ")");
       }
@@ -1623,12 +1628,17 @@ public final class S3AUtils {
           diagnostics);
       break;
 
+    case CSE_KMS:
+      LOG.debug("Using CSE-KMS with {}",
+          diagnostics);
+      break;
+
     case NONE:
     default:
       LOG.debug("Data is unencrypted");
       break;
     }
-    return sse;
+    return encryptionMethod;
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 7890e2d..fb28a40 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -577,7 +577,14 @@ public enum Statistic {
   AUDIT_REQUEST_EXECUTION(
       AuditStatisticNames.AUDIT_REQUEST_EXECUTION,
       "AWS request made",
-      TYPE_COUNTER);
+      TYPE_COUNTER),
+
+  /* Client side encryption gauge */
+  CLIENT_SIDE_ENCRYPTION_ENABLED(
+      "client_side_encryption_enabled",
+      "gauge to indicate if client side encryption is enabled",
+      TYPE_GAUGE
+  );
 
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
index 8c39aa4..17394b7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
@@ -301,8 +301,15 @@ public class HeaderProcessing extends AbstractStoreOperation {
         md.getContentEncoding());
     maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
         md.getContentLanguage());
-    maybeSetHeader(headers, XA_CONTENT_LENGTH,
-        md.getContentLength());
+    // If CSE is enabled, use the unencrypted content length.
+    if (md.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
+        && md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH) != null) {
+      maybeSetHeader(headers, XA_CONTENT_LENGTH,
+          md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH));
+    } else {
+      maybeSetHeader(headers, XA_CONTENT_LENGTH,
+          md.getContentLength());
+    }
     maybeSetHeader(headers, XA_CONTENT_MD5,
         md.getContentMD5());
     maybeSetHeader(headers, XA_CONTENT_RANGE,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index cf962b8..51b1bf6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -126,4 +126,12 @@ public final class InternalConstants {
    */
   public static final String AWS_REGION_SYSPROP = "aws.region";
 
+  /**
+   * S3 client side encryption adds padding to the content length of constant
+   * length of 16 bytes (at the moment, since we have only 1 content
+   * encryption algorithm). Use this to subtract while listing the content
+   * length when certain conditions are met.
+   */
+  public static final int CSE_PADDING_LENGTH = 16;
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
index 7b13d0d..6d0f49a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
@@ -638,7 +638,7 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
       copyResult = callbacks.copyFile(srcKey, destinationKey,
           srcAttributes, readContext);
     }
-    if (objectRepresentsDirectory(srcKey, len)) {
+    if (objectRepresentsDirectory(srcKey)) {
       renameTracker.directoryMarkerCopied(
           sourceFile,
           destination,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
index ac29780..d60e1a2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
@@ -129,6 +129,9 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
   /** Operation Auditor. */
   private final AuditSpanSource<AuditSpanS3A> auditor;
 
+  /** Is client side encryption enabled? */
+  private final boolean isCSEEnabled;
+
   /**
    * Instantiate.
    */
@@ -150,7 +153,8 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
       final boolean useListV1,
       final ContextAccessors contextAccessors,
       final ITtlTimeProvider timeProvider,
-      final AuditSpanSource<AuditSpanS3A> auditor) {
+      final AuditSpanSource<AuditSpanS3A> auditor,
+      final boolean isCSEEnabled) {
     this.fsURI = fsURI;
     this.bucket = bucket;
     this.configuration = configuration;
@@ -172,6 +176,7 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
     this.contextAccessors = contextAccessors;
     this.timeProvider = timeProvider;
     this.auditor = auditor;
+    this.isCSEEnabled = isCSEEnabled;
   }
 
   public URI getFsURI() {
@@ -429,4 +434,12 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
   public RequestFactory getRequestFactory() {
     return contextAccessors.getRequestFactory();
   }
+
+  /**
+   * return if the store context have client side encryption enabled.
+   * @return boolean indicating if CSE is enabled or not.
+   */
+  public boolean isCSEEnabled() {
+    return isCSEEnabled;
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java
index 468af1b..d4021a7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java
@@ -73,6 +73,8 @@ public class StoreContextBuilder {
 
   private AuditSpanSource<AuditSpanS3A> auditor;
 
+  private boolean isCSEEnabled;
+
   public StoreContextBuilder setFsURI(final URI fsURI) {
     this.fsURI = fsURI;
     return this;
@@ -180,6 +182,17 @@ public class StoreContextBuilder {
     return this;
   }
 
+  /**
+   * set is client side encryption boolean value.
+   * @param value value indicating if client side encryption is enabled or not.
+   * @return builder instance.
+   */
+  public StoreContextBuilder setEnableCSE(
+      boolean value) {
+    isCSEEnabled = value;
+    return this;
+  }
+
   @SuppressWarnings("deprecation")
   public StoreContext build() {
     return new StoreContext(fsURI,
@@ -199,6 +212,7 @@ public class StoreContextBuilder {
         useListV1,
         contextAccessors,
         timeProvider,
-        auditor);
+        auditor,
+        isCSEEnabled);
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
index d869705..888ed8e 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md
@@ -21,9 +21,13 @@
 ## <a name="introduction"></a> Introduction
 
 The S3A filesystem client supports Amazon S3's Server Side Encryption
-for at-rest data encryption.
-You should to read up on the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html)
-for S3 Server Side Encryption for up to date information on the encryption mechanisms.
+and Client Side Encryption for encrypting data at-rest.
+
+
+For up to date information on the encryption mechanisms, read:
+
+* [Protecting data using server-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html)
+* [Protecting data using client-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingClientSideEncryption.html)
 
 
 
@@ -32,18 +36,21 @@ Any new file written will be encrypted with this encryption configuration.
 When the S3A client reads a file, S3 will attempt to decrypt it using the mechanism
 and keys with which the file was encrypted.
 
-* It is **NOT** advised to mix and match encryption types in a bucket
+* It is **NOT** advised to mix and match encryption types in a bucket.
 * It is much simpler and safer to encrypt with just one type and key per bucket.
 * You can use AWS bucket policies to mandate encryption rules for a bucket.
 * You can use S3A per-bucket configuration to ensure that S3A clients use encryption
 policies consistent with the mandated rules.
-* You can use S3 Default Encryption to encrypt data without needing to
+* You can use S3 Default Encryption in AWS console to encrypt data without needing to
 set anything in the client.
 * Changing the encryption options on the client does not change how existing
 files were encrypted, except when the files are renamed.
-* For all mechanisms other than SSE-C, clients do not need any configuration
+* For all mechanisms other than SSE-C and CSE-KMS, clients do not need any configuration
 options set in order to read encrypted data: it is all automatically handled
 in S3 itself.
+* Encryption options and secrets are collected by [S3A Delegation Tokens](delegation_tokens.html) and passed to workers during job submission.
+* Encryption options and secrets MAY be stored in JCEKS files or any other Hadoop credential provider service.
+  This allows for more secure storage than XML files, including password protection of the secrets.
 
 ## <a name="encryption_types"></a>How data is encrypted
 
@@ -53,8 +60,7 @@ to encrypt the data as it saved to S3. It remains encrypted on S3 until deleted:
 clients cannot change the encryption attributes of an object once uploaded.
 
 The Amazon AWS SDK also offers client-side encryption, in which all the encoding
-and decoding of data is performed on the client. This is *not* supported by
-the S3A client.
+and decoding of data is performed on the client.
 
 The server-side "SSE" encryption is performed with symmetric AES256 encryption;
 S3 offers different mechanisms for actually defining the key to use.
@@ -70,6 +76,53 @@ by Amazon's Key Management Service, a key referenced by name in the uploading cl
 * SSE-C : the client specifies an actual base64 encoded AES-256 key to be used
 to encrypt and decrypt the data.
 
+Encryption options
+
+|  type | encryption | config on write | config on read |
+|-------|---------|-----------------|----------------|
+| `SSE-S3` | server side, AES256 | encryption algorithm | none |
+| `SSE-KMS` | server side, KMS key | key used to encrypt/decrypt | none |
+| `SSE-C` | server side, custom key | encryption algorithm and secret | encryption algorithm and secret |
+| `CSE-KMS` | client side, KMS key | encryption algorithm and key ID | encryption algorithm |
+
+With server-side encryption, the data is uploaded to S3 unencrypted (but wrapped by the HTTPS
+encryption channel).
+The data is encrypted in the S3 store and decrypted when it's being retrieved.
+
+A server side algorithm can be enabled by default for a bucket, so that
+whenever data is uploaded unencrypted a default encryption algorithm is added.
+When data is encrypted with S3-SSE or SSE-KMS it is transparent to all clients
+downloading the data.
+SSE-C is different in that every client must know the secret key needed to decypt the data.
+
+Working with SSE-C data is harder because every client must be configured to
+use the algorithm and supply the key. In particular, it is very hard to mix
+SSE-C encrypted objects in the same S3 bucket with objects encrypted with
+other algorithms or unencrypted; The S3A client (and other applications) get
+very confused.
+
+KMS-based key encryption is powerful as access to a key can be restricted to
+specific users/IAM roles. However, use of the key is billed and can be
+throttled. Furthermore as a client seeks around a file, the KMS key *may* be
+used multiple times.
+
+S3 Client side encryption (CSE-KMS) is an experimental feature added in July
+2021.
+
+This encrypts the data on the client, before transmitting to S3, where it is
+stored encrypted. The data is unencrypted after downloading when it is being
+read back.
+
+In CSE-KMS, the ID of an AWS-KMS key is provided to the S3A client;
+the client communicates with AWS-KMS to request a new encryption key, which
+KMS returns along with the same key encrypted with the KMS key.
+The S3 client encrypts the payload *and* attaches the KMS-encrypted version
+of the key as a header to the object.
+
+When downloading data, this header is extracted, passed to AWS KMS, and,
+if the client has the appropriate permissions, the symmetric key is
+retrieved.
+This key is then used to decode the data.
 
 ## <a name="sse-s3"></a> S3 Default Encryption
 
@@ -110,7 +163,7 @@ To learn more, refer to
 [Protecting Data Using Server-Side Encryption with Amazon S3-Managed Encryption Keys (SSE-S3) in AWS documentation](http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html).
 
 
-### <a name="sse-kms"></a> SSE-KMS: Amazon S3-KMS Managed Encryption Keys
+### <a name="sse-kms"></a> SSE-KMS: Server-Encryption with KMS Managed Encryption Keys
 
 
 Amazon offers a pay-per-use key management service, [AWS KMS](https://aws.amazon.com/documentation/kms/).
@@ -419,7 +472,82 @@ the data, so guaranteeing that access to thee data can be read by everyone
 granted access to that key, and nobody without access to it.
 
 
-###<a name="changing-encryption"></a> Use rename() to encrypt files with new keys
+### <a name="fattr"></a> Using `hadoop fs -getfattr` to view encryption information.
+
+The S3A client retrieves all HTTP headers from an object and returns
+them in the "XAttr" list of attributed, prefixed with `header.`.
+This makes them retrievable in the `getXAttr()` API calls, which
+is available on the command line through the `hadoop fs -getfattr -d` command.
+
+This makes viewing the encryption headers of a file straightforward.
+
+Here is an example of the operation invoked on a file where the client is using CSE-KMS:
+```
+bin/hadoop fs -getfattr -d s3a://test-london/file2
+
+2021-07-14 12:59:01,554 [main] WARN  s3.AmazonS3EncryptionClientV2 (AmazonS3EncryptionClientV2.java:warnOnLegacyCryptoMode(409)) - The S3 Encryption Client is configured to read encrypted data with legacy encryption modes through the CryptoMode setting. If you don't have objects encrypted with these legacy modes, you should disable support for them to enhance security. See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html
+2021-07-14 12:59:01,558 [main] WARN  s3.AmazonS3EncryptionClientV2 (AmazonS3EncryptionClientV2.java:warnOnRangeGetsEnabled(401)) - The S3 Encryption Client is configured to support range get requests. Range gets do not provide authenticated encryption properties even when used with an authenticated mode (AES-GCM). See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html
+# file: s3a://test-london/file2
+header.Content-Length="0"
+header.Content-Type="application/octet-stream"
+header.ETag="63b3f4bd6758712c98f1be86afad095a"
+header.Last-Modified="Wed Jul 14 12:56:06 BST 2021"
+header.x-amz-cek-alg="AES/GCM/NoPadding"
+header.x-amz-iv="ZfrgtxvcR41yNVkw"
+header.x-amz-key-v2="AQIDAHjZrfEIkNG24/xy/pqPPLcLJulg+O5pLgbag/745OH4VQFuiRnSS5sOfqXJyIa4FOvNAAAAfjB8BgkqhkiG9w0BBwagbzBtAgEAMGgGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQM7b5GZzD4eAGTC6gaAgEQgDte9Et/dhR7LAMU/NaPUZSgXWN5pRnM4hGHBiRNEVrDM+7+iotSTKxFco+VdBAzwFZfHjn0DOoSZEukNw=="
+header.x-amz-matdesc="{"aws:x-amz-cek-alg":"AES/GCM/NoPadding"}"
+header.x-amz-server-side-encryption="AES256"
+header.x-amz-tag-len="128"
+header.x-amz-unencrypted-content-length="0"
+header.x-amz-version-id="zXccFCB9eICszFgqv_paG1pzaUKY09Xa"
+header.x-amz-wrap-alg="kms+context"
+```
+
+Analysis
+
+1. The WARN commands are the AWS SDK warning that because the S3A client uses
+an encryption algorithm which seek() requires, the SDK considers it less
+secure than the most recent algorithm(s). Ignore.
+
+* `header.x-amz-server-side-encryption="AES256"` : the file has been encrypted with S3-SSE. This is set up as the S3 default encryption,
+so even when CSE is enabled, the data is doubly encrypted.
+* `header.x-amz-cek-alg="AES/GCM/NoPadding`: client-side encrypted with the `"AES/GCM/NoPadding` algorithm.
+* `header.x-amz-iv="ZfrgtxvcR41yNVkw"`:
+* `header.x-amz-key-v2="AQIDAHjZrfEIkNG24/xy/pqPPLcLJulg+O5pLgbag/745OH4VQFuiRnSS5sOfqXJyIa4FOvNAAAAfjB8BgkqhkiG9w0BBwagbzBtAgEAMGgGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQM7b5GZzD4eAGTC6gaAgEQgDte9Et/dhR7LAMU/NaPUZSgXWN5pRnM4hGHBiRNEVrDM+7+iotSTKxFco+VdBAzwFZfHjn0DOoSZEukNw=="`:
+* `header.x-amz-unencrypted-content-length="0"`: this is the length of the unencrypted data. The S3A client *DOES NOT* use this header;
+* `header.x-amz-wrap-alg="kms+context"`: the algorithm used to encrypt the CSE key
+  it always removes 16 bytes from non-empty files when declaring the length.
+* `header.x-amz-version-id="zXccFCB9eICszFgqv_paG1pzaUKY09Xa"`: the bucket is versioned; this is the version ID.
+
+And a directory encrypted with S3-SSE only:
+
+```
+bin/hadoop fs -getfattr -d s3a://test-london/user/stevel/target/test/data/sOCOsNgEjv
+
+# file: s3a://test-london/user/stevel/target/test/data/sOCOsNgEjv
+header.Content-Length="0"
+header.Content-Type="application/x-directory"
+header.ETag="d41d8cd98f00b204e9800998ecf8427e"
+header.Last-Modified="Tue Jul 13 20:12:07 BST 2021"
+header.x-amz-server-side-encryption="AES256"
+header.x-amz-version-id="KcDOVmznIagWx3gP1HlDqcZvm1mFWZ2a"
+```
+
+A file with no-encryption (on a bucket without versioning but with intelligent tiering):
+
+```
+bin/hadoop fs -getfattr -d s3a://landsat-pds/scene_list.gz
+
+# file: s3a://landsat-pds/scene_list.gz
+header.Content-Length="45603307"
+header.Content-Type="application/octet-stream"
+header.ETag="39c34d489777a595b36d0af5726007db"
+header.Last-Modified="Wed Aug 29 01:45:15 BST 2018"
+header.x-amz-storage-class="INTELLIGENT_TIERING"
+header.x-amz-version-id="null"
+```
+
+###<a name="changing-encryption"></a> Use `rename()` to encrypt files with new keys
 
 The encryption of an object is set when it is uploaded. If you want to encrypt
 an unencrypted file, or change the SEE-KMS key of a file, the only way to do
@@ -435,6 +563,91 @@ for reading as for writing, and you must supply that key for reading. There
 you need to copy one bucket to a different bucket, one with a different key.
 Use `distCp`for this, with per-bucket encryption policies.
 
+## <a name="cse"></a> Amazon S3 Client Side Encryption
+
+### Introduction
+Amazon S3 Client Side Encryption(S3-CSE), is used to encrypt data on the
+client-side and then transmit it over to S3 storage. The same encrypted data
+is then transmitted over to client while reading and then
+decrypted on the client-side.
+
+S3-CSE, uses `AmazonS3EncryptionClientV2.java`  as the AmazonS3 client. The
+encryption and decryption is done by AWS SDK. As of July 2021, Only CSE-KMS
+method is supported.
+
+A key reason this feature (HADOOP-13887) has been unavailable for a long time
+is that the AWS S3 client pads uploaded objects with a 16 byte footer. This
+meant that files were shorter when being read than when are listed them
+through any of the list API calls/getFileStatus(). Which broke many
+applications, including anything seeking near the end of a file to read a
+footer, as ORC and Parquet do.
+
+There is now a workaround: compensate for the footer in listings when CSE is enabled.
+
+- When listing files and directories, 16 bytes are subtracted from the length
+of all non-empty objects( greater than or equal to 16 bytes).
+- Directory markers MAY be longer than 0 bytes long.
+
+This "appears" to work; secondly it does in the testing as of July 2021. However
+, the length of files when listed through the S3A client is now going to be
+shorter than the length of files listed with other clients -including S3A
+clients where S3-CSE has not been enabled.
+
+### Features
+
+- Supports client side encryption with keys managed in AWS KMS.
+- encryption settings propagated into jobs through any issued delegation tokens.
+- encryption information stored as headers in the uploaded object.
+
+### Limitations
+
+- Performance will be reduced. All encrypt/decrypt is now being done on the
+ client.
+- Writing files may be slower, as only a single block can be encrypted and
+ uploaded at a time.
+- Multipart Uploader API disabled.
+- S3 Select is not supported.
+- Multipart uploads would be serial, and partSize must be a multiple of 16
+ bytes.
+- maximum message size in bytes that can be encrypted under this mode is
+ 2^36-32, or ~64G, due to the security limitation of AES/GCM as recommended by
+ NIST.
+
+### Setup
+- Generate an AWS KMS Key ID from AWS console for your bucket, with same
+ region as the storage bucket.
+- If already created, [view the kms key ID by these steps.](https://docs.aws.amazon.com/kms/latest/developerguide/find-cmk-id-arn.html)
+- Set `fs.s3a.server-side-encryption-algorithm=CSE-KMS`.
+- Set `fs.s3a.server-side-encryption.key=<KMS_KEY_ID>`.
+
+KMS_KEY_ID:
+
+Identifies the symmetric CMK that encrypts the data key.
+To specify a CMK, use its key ID, key ARN, alias name, or alias ARN. When
+using an alias name, prefix it with "alias/". To specify a CMK in a
+different AWS account, you must use the key ARN or alias ARN.
+
+For example:
+- Key ID: `1234abcd-12ab-34cd-56ef-1234567890ab`
+- Key ARN: `arn:aws:kms:us-east-2:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab`
+- Alias name: `alias/ExampleAlias`
+- Alias ARN: `arn:aws:kms:us-east-2:111122223333:alias/ExampleAlias`
+
+*Note:* If `fs.s3a.server-side-encryption-algorithm=CSE-KMS` is set,
+`fs.s3a.server-side-encryption.key=<KMS_KEY_ID>` property must be set for
+S3-CSE to work.
+
+```xml
+<property>
+     <name>fs.s3a.server-side-encryption-algorithm</name>
+     <value>CSE-KMS</value>
+ </property>
+
+ <property>
+     <name>fs.s3a.server-side-encryption.key</name>
+     <value>${KMS_KEY_ID}</value>
+ </property>
+```
 
 ## <a name="troubleshooting"></a> Troubleshooting Encryption
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 30047ed..6cdb492 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -1157,6 +1157,284 @@ is used, no encryption is specified, or the SSE-C specified is incorrect.
 2. A directory is encrypted with a SSE-C keyA and the user is trying to move a
 file using configured SSE-C keyB into that structure.
 
+## <a name="client-side-encryption"></a> S3 Client Side Encryption
+
+### Instruction file not found for S3 object
+
+Reading an unencrypted file would fail when read through CSE enabled client.
+```
+java.lang.SecurityException: Instruction file not found for S3 object with bucket name: ap-south-cse, key: unencryptedData.txt
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleAE.decipher(S3CryptoModuleAE.java:190)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleAE.getObjectSecurely(S3CryptoModuleAE.java:136)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.getObject(AmazonS3EncryptionClientV2.java:241)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl.getObject(S3AFileSystem.java:1462)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:217)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:216)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:382)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:230)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:354)
+    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:350)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:228)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:272)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:374)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:493)
+    at java.io.DataInputStream.read(DataInputStream.java:100)
+    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:94)
+    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:68)
+    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:129)
+    at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:101)
+    at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:96)
+    at org.apache.hadoop.fs.shell.Command.processPathInternal(Command.java:370)
+    at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:333)
+    at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:306)
+    at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:288)
+    at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:272)
+    at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:121)
+    at org.apache.hadoop.fs.shell.Command.run(Command.java:179)
+    at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
+    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:81)
+    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:95)
+    at org.apache.hadoop.fs.FsShell.main(FsShell.java:390)
+```
+CSE enabled client should read encrypted data only.
+
+### CSE-KMS method requires KMS key ID
+
+KMS key ID is required for CSE-KMS to encrypt data, not providing one leads
+ to failure.
+
+```
+2021-07-07 11:33:04,550 WARN fs.FileSystem: Failed to initialize fileystem
+s3a://ap-south-cse/: java.lang.IllegalArgumentException: CSE-KMS
+method requires KMS key ID. Use fs.s3a.server-side-encryption.key property to set it.
+-ls: CSE-KMS method requires KMS key ID. Use fs.s3a.server-side-encryption.key property to
+ set it.
+```
+
+set `fs.s3a.server-side-encryption.key=<KMS_KEY_ID>` generated through AWS console.
+
+### `com.amazonaws.services.kms.model.IncorrectKeyException` The key ID in the request does not identify a CMK that can perform this operation.
+
+KMS key ID used to PUT(encrypt) the data, must be the one used to GET the
+data.
+ ```
+cat: open s3a://ap-south-cse/encryptedData.txt at 0 on
+s3a://ap-south-cse/encryptedData.txt:
+com.amazonaws.services.kms.model.IncorrectKeyException: The key ID in the
+request does not identify a CMK that can perform this operation. (Service: AWSKMS;
+Status Code: 400; ErrorCode: IncorrectKeyException;
+Request ID: da21aa8a-f00d-467c-94a0-32b627d32bc0; Proxy: null):IncorrectKeyException:
+The key ID in the request does not identify a CMK that can perform this
+operation. (Service: AWSKMS ; Status Code: 400; Error Code: IncorrectKeyException;
+Request ID: da21aa8a-f00d-467c-94a0-32b627d32bc0; Proxy: null)
+```
+Use the same KMS key ID used to upload data to download and read it as well.
+
+### `com.amazonaws.services.kms.model.NotFoundException` key/<KMS_KEY_ID> does not exist
+
+Using a KMS key ID from a different region than the bucket used to store data
+ would lead to failure while uploading.
+
+```
+mkdir: PUT 0-byte object  on testmkdir:
+com.amazonaws.services.kms.model.NotFoundException: Key
+'arn:aws:kms:ap-south-1:152813717728:key/<KMS_KEY_ID>'
+does not exist (Service: AWSKMS; Status Code: 400; Error Code: NotFoundException;
+Request ID: 279db85d-864d-4a38-9acd-d892adb504c0; Proxy: null):NotFoundException:
+Key 'arn:aws:kms:ap-south-1:152813717728:key/<KMS_KEY_ID>'
+does not exist(Service: AWSKMS; Status Code: 400; Error Code: NotFoundException;
+Request ID: 279db85d-864d-4a38-9acd-d892adb504c0; Proxy: null)
+```
+While generating the KMS Key ID make sure to generate it in the same region
+ as your bucket.
+
+### Unable to perform range get request: Range get support has been disabled
+
+If Range get is not supported for a CSE algorithm or is disabled:
+```
+java.lang.SecurityException: Unable to perform range get request: Range get support has been disabled. See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html
+
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleAE.assertCanGetPartialObject(S3CryptoModuleAE.java:446)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleAE.getObjectSecurely(S3CryptoModuleAE.java:117)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.getObject(AmazonS3EncryptionClientV2.java:241)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl.getObject(S3AFileSystem.java:1462)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:217)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:216)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:382)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:230)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:354)
+    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:350)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:228)
+    at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:272)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:374)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:408)
+    at java.io.DataInputStream.readByte(DataInputStream.java:265)
+```
+Range gets must be enabled for CSE to work.
+
+### WARNING: Range gets do not provide authenticated encryption properties even when used with an authenticated mode (AES-GCM).
+
+The S3 Encryption Client is configured to support range get requests. This
+ warning would be shown everytime S3-CSE is used.
+```
+2021-07-14 12:54:09,525 [main] WARN  s3.AmazonS3EncryptionClientV2
+(AmazonS3EncryptionClientV2.java:warnOnRangeGetsEnabled(401)) - The S3
+Encryption Client is configured to support range get requests. Range gets do
+not provide authenticated encryption properties even when used with an
+authenticated mode (AES-GCM). See https://docs.aws.amazon.com/general/latest
+/gr/aws_sdk_cryptography.html
+```
+We can Ignore this warning since, range gets must be enabled for S3-CSE to
+get data.
+
+### WARNING: If you don't have objects encrypted with these legacy modes, you should disable support for them to enhance security.
+
+The S3 Encryption Client is configured to read encrypted data with legacy
+encryption modes through the CryptoMode setting, and we would see this
+warning for all S3-CSE request.
+
+```
+2021-07-14 12:54:09,519 [main] WARN  s3.AmazonS3EncryptionClientV2
+(AmazonS3EncryptionClientV2.java:warnOnLegacyCryptoMode(409)) - The S3
+Encryption Client is configured to read encrypted data with legacy
+encryption modes through the CryptoMode setting. If you don't have objects
+encrypted with these legacy modes, you should disable support for them to
+enhance security. See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html
+```
+We can ignore this, since this CryptoMode setting(CryptoMode.AuthenticatedEncryption)
+is required for range gets to work.
+
+### com.amazonaws.services.kms.model.InvalidKeyUsageException: You cannot generate a data key with an asymmetric CMK
+
+If you generated an Asymmetric CMK from AWS console then CSE-KMS won't be
+able to generate unique data key for encryption.
+
+```
+Caused by: com.amazonaws.services.kms.model.InvalidKeyUsageException:
+You cannot generate a data key with an asymmetric CMK
+(Service: AWSKMS; Status Code: 400; Error Code: InvalidKeyUsageException; Request ID: 93609c15-e490-4035-8390-f4396f0d90bf; Proxy: null)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
+    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:7223)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7190)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7179)
+    at com.amazonaws.services.kms.AWSKMSClient.executeGenerateDataKey(AWSKMSClient.java:3482)
+    at com.amazonaws.services.kms.AWSKMSClient.generateDataKey(AWSKMSClient.java:3451)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.buildContentCryptoMaterial(S3CryptoModuleBase.java:533)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.newContentCryptoMaterial(S3CryptoModuleBase.java:481)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.createContentCryptoMaterial(S3CryptoModuleBase.java:447)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectUsingMetadata(S3CryptoModuleBase.java:160)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectSecurely(S3CryptoModuleBase.java:156)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.putObject(AmazonS3EncryptionClientV2.java:236)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2792)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2789)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$33(S3AFileSystem.java:4440)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    ... 49 more
+```
+
+Generate a Symmetric Key in the same region as your S3 storage for CSE-KMS to
+work.
+
+### com.amazonaws.services.kms.model.NotFoundException: Invalid keyId
+
+If the value in `fs.s3a.server-side-encryption.key` property, does not exist
+/valid in AWS KMS CMK(Customer managed keys), then this error would be seen.
+
+```
+Caused by: com.amazonaws.services.kms.model.NotFoundException: Invalid keyId abc
+(Service: AWSKMS; Status Code: 400; Error Code: NotFoundException; Request ID: 9d53552a-3d1b-47c8-984c-9a599d5c2391; Proxy: null)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
+    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:7223)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7190)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7179)
+    at com.amazonaws.services.kms.AWSKMSClient.executeGenerateDataKey(AWSKMSClient.java:3482)
+    at com.amazonaws.services.kms.AWSKMSClient.generateDataKey(AWSKMSClient.java:3451)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.buildContentCryptoMaterial(S3CryptoModuleBase.java:533)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.newContentCryptoMaterial(S3CryptoModuleBase.java:481)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.createContentCryptoMaterial(S3CryptoModuleBase.java:447)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectUsingMetadata(S3CryptoModuleBase.java:160)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectSecurely(S3CryptoModuleBase.java:156)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.putObject(AmazonS3EncryptionClientV2.java:236)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2792)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2789)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$33(S3AFileSystem.java:4440)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    ... 49 more
+```
+
+Check if `fs.s3a.server-side-encryption.key` is set correctly and matches the
+same on AWS console.
+
+### com.amazonaws.services.kms.model.AWSKMSException: User: <User_ARN> is not authorized to perform : kms :GenerateDataKey on resource: <KEY_ID>
+
+User doesn't have authorization to the specific AWS KMS Key ID.
+```
+Caused by: com.amazonaws.services.kms.model.AWSKMSException:
+User: arn:aws:iam::152813717728:user/<user> is not authorized to perform: kms:GenerateDataKey on resource: <key_ID>
+(Service: AWSKMS; Status Code: 400; Error Code: AccessDeniedException; Request ID: 4ded9f1f-b245-4213-87fc-16cba7a1c4b9; Proxy: null)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
+    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
+    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
+    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:7223)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7190)
+    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:7179)
+    at com.amazonaws.services.kms.AWSKMSClient.executeGenerateDataKey(AWSKMSClient.java:3482)
+    at com.amazonaws.services.kms.AWSKMSClient.generateDataKey(AWSKMSClient.java:3451)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.buildContentCryptoMaterial(S3CryptoModuleBase.java:533)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.newContentCryptoMaterial(S3CryptoModuleBase.java:481)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.createContentCryptoMaterial(S3CryptoModuleBase.java:447)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectUsingMetadata(S3CryptoModuleBase.java:160)
+    at com.amazonaws.services.s3.internal.crypto.v2.S3CryptoModuleBase.putObjectSecurely(S3CryptoModuleBase.java:156)
+    at com.amazonaws.services.s3.AmazonS3EncryptionClientV2.putObject(AmazonS3EncryptionClientV2.java:236)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2792)
+    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2789)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$33(S3AFileSystem.java:4440)
+    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
+    ... 49 more
+```
+
+The user trying to use the KMS Key ID should have the right permissions to access
+(encrypt/decrypt) using the AWS KMS Key used via `fs.s3a.server-side-encryption.key`.
+If not, then add permission(or IAM role) in "Key users" section by selecting the
+AWS-KMS CMK Key on AWS console.
+
 ### <a name="not_all_bytes_were_read"></a> Message appears in logs "Not all bytes were read from the S3ObjectInputStream"
 
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
index 4e5c452..2429191 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.store.audit.AuditSpanSource;
 import org.apache.hadoop.io.IOUtils;
 
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -259,4 +260,12 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
 
     return source.createSpan(getMethodName(), null, null);
   }
+
+  /**
+   *  Method to assume that S3 client side encryption is disabled on a test.
+   */
+  public void skipIfClientSideEncryption() {
+    Assume.assumeTrue("Skipping test if CSE is enabled",
+        !getFileSystem().isCSEEnabled());
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
index f9cfc04..4fc03e0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/EncryptionTestUtils.java
@@ -51,7 +51,7 @@ public final class EncryptionTestUtils {
    */
   public static String convertKeyToMd5(FileSystem fs) {
     String base64Key = fs.getConf().getTrimmed(
-            SERVER_SIDE_ENCRYPTION_KEY
+        SERVER_SIDE_ENCRYPTION_KEY
     );
     byte[] key = Base64.decodeBase64(base64Key);
     byte[] md5 =  DigestUtils.md5(key);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
new file mode 100644
index 0000000..880f032
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+  private static final int SMALL_FILE_SIZE = 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFileSize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    maybeSkipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(SMALL_FILE_SIZE, 'a', 'z');
+    writeDataset(fs, src, data, data.length, SMALL_FILE_SIZE,
+        true, false);
+
+    ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  /**
+   * Test to verify if we get same content length of files in S3 CSE using
+   * listStatus and listFiles on the parent directory.
+   */
+  @Test
+  public void testDirectoryListingFileLengths() throws IOException {
+    describe("Test to verify directory listing calls gives correct content "
+        + "lengths");
+    maybeSkipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path parentDir = path(getMethodName());
+
+    // Creating files in the parent directory that will be used to assert
+    // content length.
+    for (int i : SIZES) {
+      Path child = new Path(parentDir, getMethodName() + i);
+      writeThenReadFile(child, i);
+    }
+
+    // Getting the content lengths of files inside the directory via FileStatus.
+    List<Integer> fileLengthDirListing = new ArrayList<>();
+    for (FileStatus fileStatus : fs.listStatus(parentDir)) {
+      fileLengthDirListing.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // ListStatus.
+    Assertions.assertThat(fileLengthDirListing)
+        .describedAs("File lengths aren't the same "
+            + "as expected from FileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+    // Getting the content lengths of files inside the directory via ListFiles.
+    RemoteIterator<LocatedFileStatus> listDir = fs.listFiles(parentDir, true);
+    List<Integer> fileLengthListLocated = new ArrayList<>();
+    while (listDir.hasNext()) {
+      LocatedFileStatus fileStatus = listDir.next();
+      fileLengthListLocated.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // LocatedFileStatus.
+    Assertions.assertThat(fileLengthListLocated)
+        .describedAs("File lengths isn't same "
+            + "as expected from LocatedFileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+  }
+
+  /**
+   * Test to verify multipart upload through S3ABlockOutputStream and
+   * verifying the contents of the uploaded file.
+   */
+  @Test
+  public void testBigFilePutAndGet() throws IOException {
+    maybeSkipTest();
+    assume("Scale test disabled: to enable set property " +
+        KEY_SCALE_TESTS_ENABLED, getTestPropertyBool(
+        getConfiguration(),
+        KEY_SCALE_TESTS_ENABLED,
+        DEFAULT_SCALE_TESTS_ENABLED));
+    S3AFileSystem fs = getFileSystem();
+    Path filePath = path(getMethodName());
+    byte[] fileContent = dataset(BIG_FILE_SIZE, 'a', 26);
+    int offsetSeek = fileContent[BIG_FILE_SIZE - 4];
+
+    // PUT a 15MB file using CSE to force multipart in CSE.
+    createFile(fs, filePath, true, fileContent);
+    LOG.info("Multi-part upload successful...");
+
+    try (FSDataInputStream in = fs.open(filePath)) {
+      // Verify random IO.
+      in.seek(BIG_FILE_SIZE - 4);
+      assertEquals("Byte at a specific position not equal to actual byte",
+          offsetSeek, in.read());
+      in.seek(0);
+      assertEquals("Byte at a specific position not equal to actual byte",
+          'a', in.read());
+
+      // Verify seek-read between two multipart blocks.
+      in.seek(MULTIPART_MIN_SIZE - 1);
+      int byteBeforeBlockEnd = fileContent[MULTIPART_MIN_SIZE];
+      assertEquals("Byte before multipart block end mismatch",
+          byteBeforeBlockEnd - 1, in.read());
+      assertEquals("Byte at multipart end mismatch",
+          byteBeforeBlockEnd, in.read());
+      assertEquals("Byte after multipart end mismatch",
+          byteBeforeBlockEnd + 1, in.read());
+
+      // Verify end of file seek read.
+      in.seek(BIG_FILE_SIZE + 1);
+      assertEquals("Byte at eof mismatch",
+          -1, in.read());
+
+      // Verify full read.
+      in.readFully(0, fileContent);
+      verifyFileContents(fs, filePath, fileContent);
+    }
+  }
+
+  /**
+   * Testing how unencrypted and encrypted data behaves when read through
+   * CSE enabled and disabled FS respectively.
+   */
+  @Test
+  public void testEncryptionEnabledAndDisabledFS() throws Exception {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();
+    Configuration cseDisabledConf = getConfiguration();
+    S3AFileSystem cseEnabledFS = getFileSystem();
+    Path unEncryptedFilePath = path(getMethodName());
+    Path encryptedFilePath = path(getMethodName() + "cse");
+
+    // Initialize a CSE disabled FS.
+    cseDisabledConf.unset(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    cseDisabledConf.unset(SERVER_SIDE_ENCRYPTION_KEY);
+    cseDisabledFS.initialize(getFileSystem().getUri(),
+        cseDisabledConf);
+
+    // Verifying both FS instances using an IOStat gauge.
+    IOStatistics cseDisabledIOStats = cseDisabledFS.getIOStatistics();
+    IOStatistics cseEnabledIOStatistics = cseEnabledFS.getIOStatistics();
+    IOStatisticAssertions.assertThatStatisticGauge(cseDisabledIOStats,
+        Statistic.CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol()).isEqualTo(0L);
+    IOStatisticAssertions.assertThatStatisticGauge(cseEnabledIOStatistics,
+        Statistic.CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol()).isEqualTo(1L);
+
+    // Unencrypted data written to a path.
+    try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFilePath)) {
+      out.write(new byte[SMALL_FILE_SIZE]);
+    }
+
+    // CSE enabled FS trying to read unencrypted data would face an exception.
+    try (FSDataInputStream in = cseEnabledFS.open(unEncryptedFilePath)) {
+      FileStatus encryptedFSFileStatus =
+          cseEnabledFS.getFileStatus(unEncryptedFilePath);
+      assertEquals("Mismatch in content length bytes", SMALL_FILE_SIZE,
+          encryptedFSFileStatus.getLen());
+
+      intercept(SecurityException.class, "",
+          "SecurityException should be thrown",
+          () -> {
+            in.read(new byte[SMALL_FILE_SIZE]);
+            return "Exception should be raised if unencrypted data is read by "
+                + "a CSE enabled FS";
+          });
+    }
+
+    // Encrypted data written to a path.
+    try (FSDataOutputStream out = cseEnabledFS.create(encryptedFilePath)) {
+      out.write('a');
+    }
+
+    // CSE disabled FS tries to read encrypted data.
+    try (FSDataInputStream in = cseDisabledFS.open(encryptedFilePath)) {
+      FileStatus unEncryptedFSFileStatus =
+          cseDisabledFS.getFileStatus(encryptedFilePath);
+      // Due to padding and encryption, content written and length shouldn't be
+      // equal to what a CSE disabled FS would read.
+      assertNotEquals("Mismatch in content length", 1,
+          unEncryptedFSFileStatus.getLen());
+      Assertions.assertThat(in.read())
+          .describedAs("Encrypted data shouldn't be equal to actual content "
+              + "without deciphering")
+          .isNotEqualTo('a');
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, Constants.MULTIPART_SIZE,
+        Constants.MIN_MULTIPART_THRESHOLD);
+    // To force multi part put and get in small files, we'll set the
+    // threshold and part size to 5MB.
+    conf.set(Constants.MULTIPART_SIZE,
+        String.valueOf(MULTIPART_MIN_SIZE));
+    conf.set(Constants.MIN_MULTIPART_THRESHOLD,
+        String.valueOf(MULTIPART_MIN_SIZE));
+    return conf;
+  }
+
+  /**
+   * Method to validate CSE for different file sizes.
+   *
+   * @param len length of the file.
+   */
+  protected void validateEncryptionForFileSize(int len) throws IOException {
+    maybeSkipTest();
+    describe("Create an encrypted file of size " + len);
+    // Creating a unique path by adding file length in file name.
+    Path path = writeThenReadFile(getMethodName() + len, len);
+    assertEncrypted(path);
+    rm(getFileSystem(), path, false, false);
+  }
+
+  /**
+   * Skip tests if certain conditions are met.
+   */
+  protected abstract void maybeSkipTest();
+
+  /**
+   * Assert that at path references an encrypted blob.
+   *
+   * @param path path
+   * @throws IOException on a failure
+   */
+  protected abstract void assertEncrypted(Path path) throws IOException;
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
new file mode 100644
index 0000000..35f648f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionKms.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.amazonaws.services.s3.Headers;
+import org.assertj.core.api.Assertions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfKmsKeyIdIsNotSet;
+
+/**
+ * Testing the S3 CSE - KMS method.
+ */
+public class ITestS3AClientSideEncryptionKms
+    extends ITestS3AClientSideEncryption {
+
+  private static final String KMS_KEY_WRAP_ALGO = "kms+context";
+  private static final String KMS_CONTENT_ENCRYPTION_ALGO = "AES/GCM/NoPadding";
+
+  /**
+   * Creating custom configs for KMS testing.
+   *
+   * @return Configuration.
+   */
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.disableFilesystemCaching(conf);
+    return conf;
+  }
+
+  @Override
+  protected void maybeSkipTest() {
+    skipIfEncryptionTestsDisabled(getConfiguration());
+    skipIfKmsKeyIdIsNotSet(getConfiguration());
+  }
+
+  @Override
+  protected void assertEncrypted(Path path) throws IOException {
+    Map<String, byte[]> fsXAttrs = getFileSystem().getXAttrs(path);
+    String xAttrPrefix = "header.";
+
+    // Assert KeyWrap Algo
+    assertEquals("Key wrap algo isn't same as expected", KMS_KEY_WRAP_ALGO,
+        processHeader(fsXAttrs,
+            xAttrPrefix + Headers.CRYPTO_KEYWRAP_ALGORITHM));
+
+    // Assert content encryption algo for KMS, is present in the
+    // materials description and KMS key ID isn't.
+    String keyId =
+        getConfiguration().get(Constants.SERVER_SIDE_ENCRYPTION_KEY);
+    Assertions.assertThat(processHeader(fsXAttrs,
+        xAttrPrefix + Headers.MATERIALS_DESCRIPTION))
+        .describedAs("Materials Description should contain the content "
+            + "encryption algo and should not contain the KMS keyID.")
+        .contains(KMS_CONTENT_ENCRYPTION_ALGO)
+        .doesNotContain(keyId);
+  }
+
+  /**
+   * A method to process a FS xAttribute Header key by decoding it.
+   *
+   * @param fsXAttrs  Map of String(Key) and bytes[](Value) to represent fs
+   *                  xAttr.
+   * @param headerKey Key value of the header we are trying to process.
+   * @return String representing the value of key header provided.
+   */
+  private String processHeader(Map<String, byte[]> fsXAttrs,
+      String headerKey) {
+    return HeaderProcessing.decodeBytes(fsXAttrs.get(headerKey));
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
index 67c403e..3a8cf7a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
 
@@ -38,9 +39,11 @@ public class ITestS3AEncryptionSSEKMSUserDefinedKey
     // get the KMS key for this test.
     Configuration c = new Configuration();
     String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey)){
-      skip(SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " +
-          SSE_KMS.getMethod());
+    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
+        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
+      skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
+          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
+          + "SSE-KMS");
     }
     Configuration conf = super.createConfiguration();
     conf.set(SERVER_SIDE_ENCRYPTION_KEY, kmsKey);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
index c5ef65f..c7a62a3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionWithDefaultS3Settings.java
@@ -57,9 +57,11 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
     S3AFileSystem fs = getFileSystem();
     Configuration c = fs.getConf();
     String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey)) {
+    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
+        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
       skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
-          SSE_KMS.getMethod());
+          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
+          + "SSE-KMS");
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
index b0e1a40..a49998d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
 import org.apache.hadoop.test.LambdaTestUtils;
 
 import org.junit.Assume;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.FileNotFoundException;
@@ -63,6 +64,14 @@ public class ITestS3AInconsistency extends AbstractS3ATestBase {
   /** By using a power of 2 for the initial time, the total is a shift left. */
   private static final int TOTAL_RETRY_DELAY = INITIAL_RETRY << RETRIES;
 
+  /**
+   * S3 Client side encryption when enabled should skip this test.
+   */
+  @Before
+  public void setUp() {
+    skipIfClientSideEncryption();
+  }
+
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index 496226c..daefa78 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -187,6 +187,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
    */
   private void assumeNoDefaultEncryption() throws IOException {
     try {
+      skipIfClientSideEncryption();
       Assume.assumeThat(getDefaultEncryption(), nullValue());
     } catch (AccessDeniedException e) {
       // if the user can't check the default encryption, assume that it is
@@ -254,7 +255,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
   }
 
   private S3AEncryptionMethods encryptionAlgorithm() {
-    return getFileSystem().getServerSideEncryptionAlgorithm();
+    return getFileSystem().getS3EncryptionAlgorithm();
   }
 
   @Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 09f66df..2475b54 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -70,13 +70,17 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     invoker = new Invoker(new S3ARetryPolicy(getConfiguration()),
         Invoker.NO_OP
     );
+    skipIfClientSideEncryption();
     Assume.assumeTrue("No metadata store in test filesystem",
         getFileSystem().hasMetadataStore());
   }
 
   @Override
   public void teardown() throws Exception {
-    clearInconsistency(getFileSystem());
+    if (getFileSystem()
+        .getAmazonS3Client() instanceof InconsistentAmazonS3Client) {
+      clearInconsistency(getFileSystem());
+    }
     super.teardown();
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 2c66243..945d3f0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -234,4 +234,17 @@ public interface S3ATestConstants {
    * every test case.
    */
   String DIRECTORY_MARKER_AUDIT = "fs.s3a.directory.marker.audit";
+
+  /**
+   * Constant bytes being written when Client side encryption KMS is enabled
+   * for a test. This bytes written takes into account "EncryptionContext",
+   * which contains the algo used for eg:
+   * "aws:x-amz-cek-alg":"AES/GCM/NoPadding" , and "KeySpec", which specifies
+   * the length of data key. for eg: AES_256 to generate a 256-bit symmetric
+   * key.
+   *
+   * For test using bytesWritten as an assertion this constant value can be
+   * used.
+   */
+  int KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN = 94;
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 9f945e4..6c51c73 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -1495,4 +1495,16 @@ public final class S3ATestUtils {
         probes);
   }
 
+  /**
+   * Skip a test if CSE KMS key id is not set.
+   *
+   * @param configuration configuration to probe.
+   */
+  public static void skipIfKmsKeyIdIsNotSet(Configuration configuration) {
+    if (configuration.get(
+        SERVER_SIDE_ENCRYPTION_KEY) == null) {
+      skip("AWS KMS key id is not set");
+    }
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
index beeb667..a1e56c3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
@@ -96,7 +96,7 @@ public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
         fs.getBucket(),
         path,
         fs.pathToKey(path),
-        fs.getServerSideEncryptionAlgorithm(),
+        fs.getS3EncryptionAlgorithm(),
         new EncryptionSecrets().getEncryptionKey(),
         eTag,
         versionId,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
index a664a8b..273cf8b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSSEConfiguration.java
@@ -86,14 +86,14 @@ public class TestSSEConfiguration extends Assert {
   public void testSSEEmptyKey() {
     // test the internal logic of the test setup code
     Configuration c = buildConf(SSE_C.getMethod(), "");
-    assertEquals("", getServerSideEncryptionKey(BUCKET, c));
+    assertEquals("", getS3EncryptionKey(BUCKET, c));
   }
 
   @Test
   public void testSSEKeyNull() throws Throwable {
     // test the internal logic of the test setup code
     final Configuration c = buildConf(SSE_C.getMethod(), null);
-    assertEquals("", getServerSideEncryptionKey(BUCKET, c));
+    assertEquals("", getS3EncryptionKey(BUCKET, c));
 
     intercept(IOException.class, SSE_C_NO_KEY_ERROR,
         () -> getEncryptionAlgorithm(BUCKET, c));
@@ -109,7 +109,7 @@ public class TestSSEConfiguration extends Assert {
     // provider provisioned value instead.
     conf.set(SERVER_SIDE_ENCRYPTION_KEY, "keyInConfObject");
 
-    String sseKey = getServerSideEncryptionKey(BUCKET, conf);
+    String sseKey = getS3EncryptionKey(BUCKET, conf);
     assertNotNull("Proxy password should not retrun null.", sseKey);
     assertEquals("Proxy password override did NOT work.", key, sseKey);
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
index 72af175..22ce1e9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.SignableRequest;
+import com.amazonaws.auth.AWS4Signer;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.Signer;
 import com.amazonaws.services.s3.internal.AWSS3V4Signer;
@@ -169,6 +170,20 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
       LOG.info("Creating Signer #{}", c);
     }
 
+    /**
+     * Method to sign the incoming request with credentials.
+     *
+     * NOTE: In case of Client-side encryption, we do a "Generate Key" POST
+     * request to AWSKMS service rather than S3, this was causing the test to
+     * break. When this request happens, we have the endpoint in form of
+     * "kms.[REGION].amazonaws.com", and bucket-name becomes "kms". We can't
+     * use AWSS3V4Signer for AWSKMS service as it contains a header
+     * "x-amz-content-sha256:UNSIGNED-PAYLOAD", which returns a 400 bad
+     * request because the signature calculated by the service doesn't match
+     * what we sent.
+     * @param request the request to sign.
+     * @param credentials credentials used to sign the request.
+     */
     @Override
     public void sign(SignableRequest<?> request, AWSCredentials credentials) {
       int c = INVOCATION_COUNT.incrementAndGet();
@@ -182,10 +197,21 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
       } catch (IOException e) {
         throw new RuntimeException("Failed to get current Ugi", e);
       }
-      AWSS3V4Signer realSigner = new AWSS3V4Signer();
-      realSigner.setServiceName("s3");
-      realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
-      realSigner.sign(request, credentials);
+      if (bucketName.equals("kms")) {
+        AWS4Signer realKMSSigner = new AWS4Signer();
+        realKMSSigner.setServiceName("kms");
+        if (lastStoreValue != null) {
+          realKMSSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
+        }
+        realKMSSigner.sign(request, credentials);
+      } else {
+        AWSS3V4Signer realSigner = new AWSS3V4Signer();
+        realSigner.setServiceName("s3");
+        if (lastStoreValue != null) {
+          realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
+        }
+        realSigner.sign(request, credentials);
+      }
     }
 
     public static int getInstantiationCount() {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
index 34e355b..126f8b3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java
@@ -141,6 +141,10 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
     // disable if assume role opts are off
     assumeSessionTestsEnabled(conf);
     disableFilesystemCaching(conf);
+    String s3EncryptionMethod =
+        conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM,
+            S3AEncryptionMethods.SSE_KMS.getMethod());
+    String s3EncryptionKey = conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY, "");
     removeBaseAndBucketOverrides(conf,
         DELEGATION_TOKEN_BINDING,
         SERVER_SIDE_ENCRYPTION_ALGORITHM,
@@ -149,10 +153,11 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
         UserGroupInformation.AuthenticationMethod.KERBEROS.name());
     enableDelegationTokens(conf, getDelegationBinding());
     conf.set(AWS_CREDENTIALS_PROVIDER, " ");
-    // switch to SSE_S3.
+    // switch to CSE-KMS(if specified) else SSE-KMS.
     if (conf.getBoolean(KEY_ENCRYPTION_TESTS, true)) {
-      conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
-          S3AEncryptionMethods.SSE_S3.getMethod());
+      conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, s3EncryptionMethod);
+      // KMS key ID a must if CSE-KMS is being tested.
+      conf.set(SERVER_SIDE_ENCRYPTION_KEY, s3EncryptionKey);
     }
     // set the YARN RM up for YARN tests.
     conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
@@ -361,10 +366,10 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
       assertBoundToDT(delegatedFS, tokenKind);
       if (encryptionTestEnabled()) {
         assertNotNull("Encryption propagation failed",
-            delegatedFS.getServerSideEncryptionAlgorithm());
+            delegatedFS.getS3EncryptionAlgorithm());
         assertEquals("Encryption propagation failed",
-            fs.getServerSideEncryptionAlgorithm(),
-            delegatedFS.getServerSideEncryptionAlgorithm());
+            fs.getS3EncryptionAlgorithm(),
+            delegatedFS.getS3EncryptionAlgorithm());
       }
       verifyRestrictedPermissions(delegatedFS);
 
@@ -396,10 +401,10 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
       assertBoundToDT(secondDelegate, tokenKind);
       if (encryptionTestEnabled()) {
         assertNotNull("Encryption propagation failed",
-            secondDelegate.getServerSideEncryptionAlgorithm());
+            secondDelegate.getS3EncryptionAlgorithm());
         assertEquals("Encryption propagation failed",
-            fs.getServerSideEncryptionAlgorithm(),
-            secondDelegate.getServerSideEncryptionAlgorithm());
+            fs.getS3EncryptionAlgorithm(),
+            secondDelegate.getS3EncryptionAlgorithm());
       }
       ContractTestUtils.assertDeleted(secondDelegate, testPath, true);
       assertNotNull("unbounded DT",
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index faef79c..012b6b9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -25,7 +25,6 @@ import java.util.stream.Collectors;
 
 import com.amazonaws.services.s3.AmazonS3;
 import org.assertj.core.api.Assertions;
-import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -179,11 +178,9 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
     if (useInconsistentClient()) {
       AmazonS3 client = getFileSystem()
           .getAmazonS3ClientForTesting("fault injection");
-      Assert.assertTrue(
-          "AWS client is not inconsistent, even though the test requirees it "
-          + client,
-          client instanceof InconsistentAmazonS3Client);
-      inconsistentClient = (InconsistentAmazonS3Client) client;
+      if (client instanceof InconsistentAmazonS3Client) {
+        inconsistentClient = (InconsistentAmazonS3Client) client;
+      }
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 5dc8be0..20ffd0f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -15,6 +15,7 @@ package org.apache.hadoop.fs.s3a.fileContext;
 
 import java.net.URI;
 
+import com.amazonaws.services.s3.model.CryptoStorageMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,6 +24,7 @@ import org.apache.hadoop.fs.FCStatisticsBaseTest;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
 
@@ -30,6 +32,11 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
 /**
  * S3a implementation of FCStatisticsBaseTest.
  */
@@ -39,10 +46,11 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
       LoggerFactory.getLogger(STSClientFactory.class);
 
   private Path testRootPath;
+  private Configuration conf;
 
   @Before
   public void setUp() throws Exception {
-    Configuration conf = new Configuration();
+    conf = new Configuration();
     fc = S3ATestUtils.createTestFileContext(conf);
     testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
     fc.mkdir(testRootPath,
@@ -62,10 +70,31 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
     Assert.assertEquals(2 * blockSize, stats.getBytesRead());
   }
 
+  /**
+   * A method to verify the bytes written.
+   * <br>
+   * NOTE: if Client side encryption is enabled, expected bytes written
+   * should increase by 16(padding of data) + bytes for the key ID set + 94(KMS
+   * key generation) in case of storage type{@link CryptoStorageMode} as
+   * ObjectMetadata(Default). If Crypto Storage mode is instruction file then
+   * add additional bytes as that file is stored separately and would account
+   * for bytes written.
+   *
+   * @param stats Filesystem statistics.
+   */
   @Override
   protected void verifyWrittenBytes(FileSystem.Statistics stats) {
     //No extra bytes are written
-    Assert.assertEquals(blockSize, stats.getBytesWritten());
+    long expectedBlockSize = blockSize;
+    if (conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM, "")
+        .equals(S3AEncryptionMethods.CSE_KMS.getMethod())) {
+      String keyId = conf.get(SERVER_SIDE_ENCRYPTION_KEY, "");
+      // Adding padding length and KMS key generation bytes written.
+      expectedBlockSize += CSE_PADDING_LENGTH + keyId.getBytes().length +
+          KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN;
+    }
+    Assert.assertEquals("Mismatch in bytes written", expectedBlockSize,
+        stats.getBytesWritten());
   }
 
   @Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
index 9b63430..be51b2f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesEncryption.java
@@ -25,9 +25,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
 
@@ -43,9 +45,11 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
   public void setup() throws Exception {
     Configuration c = new Configuration();
     String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
-    if (StringUtils.isBlank(kmsKey)) {
+    if (StringUtils.isBlank(kmsKey) || !c.get(SERVER_SIDE_ENCRYPTION_ALGORITHM)
+        .equals(S3AEncryptionMethods.CSE_KMS.name())) {
       skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
-              SSE_KMS.getMethod());
+          SSE_KMS.getMethod() + " or CSE-KMS algorithm is used instead of "
+          + "SSE-KMS");
     }
     super.setup();
   }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
index a8635ea..a1c5c3f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
@@ -49,7 +49,7 @@ public class ITestS3AHugeFilesSSECDiskBlocks
   protected Configuration createScaleConfiguration() {
     Configuration conf = super.createScaleConfiguration();
     removeBaseAndBucketOverrides(conf, SERVER_SIDE_ENCRYPTION_KEY,
-            SERVER_SIDE_ENCRYPTION_ALGORITHM);
+           SERVER_SIDE_ENCRYPTION_ALGORITHM);
     S3ATestUtils.disableFilesystemCaching(conf);
     conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
         getSSEAlgorithm().getMethod());
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index 4d1096f..27798e2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -242,6 +242,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
 
   @Test
   public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
+    skipIfClientSideEncryption();
     requireCSVTestData();
     int blockSize = _1MB;
     describe("Open the test file %s and read it in blocks of size %d",
@@ -348,6 +349,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
   @Test
   public void testReadWithNormalPolicy() throws Throwable {
     describe("Read big blocks with a big readahead");
+    skipIfClientSideEncryption();
     executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2,
         S3AInputPolicy.Normal);
     assertStreamOpenedExactlyOnce();
@@ -356,6 +358,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
   @Test
   public void testDecompressionSequential128K() throws Throwable {
     describe("Decompress with a 128K readahead");
+    skipIfClientSideEncryption();
     executeDecompression(128 * _1KB, S3AInputPolicy.Sequential);
     assertStreamOpenedExactlyOnce();
   }
@@ -458,6 +461,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
 
   @Test
   public void testRandomIORandomPolicy() throws Throwable {
+    skipIfClientSideEncryption();
     executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length);
     assertEquals("streams aborted in " + streamStatistics,
         0, streamStatistics.getAborted());
@@ -465,6 +469,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
 
   @Test
   public void testRandomIONormalPolicy() throws Throwable {
+    skipIfClientSideEncryption();
     long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
     executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
     assertEquals("streams aborted in " + streamStatistics,

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org