You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2021/07/07 17:32:50 UTC

[GitHub] [hadoop] mukund-thakur commented on a change in pull request #2706: HADOOP-13887. Support S3 client side encryption (S3-CSE) using AWS-SDK

mukund-thakur commented on a change in pull request #2706:
URL: https://github.com/apache/hadoop/pull/2706#discussion_r665189654



##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
##########
@@ -307,29 +311,31 @@ public synchronized void write(byte[] source, int offset, int len)
       // of capacity
       // Trigger an upload then process the remainder.
       LOG.debug("writing more data than block has capacity -triggering upload");
-      uploadCurrentBlock();
+      uploadCurrentBlock(false);
       // tail recursion is mildly expensive, but given buffer sizes must be MB.
       // it's unlikely to recurse very deeply.
       this.write(source, offset + written, len - written);
     } else {
-      if (remainingCapacity == 0) {
+      if (remainingCapacity == 0 && !isCSEEnabled) {
         // the whole buffer is done, trigger an upload
-        uploadCurrentBlock();
+        uploadCurrentBlock(false);
       }
     }
   }
 
   /**
    * Start an asynchronous upload of the current block.
+   * @param isLast true, if part being uploaded is last and client side

Review comment:
       and or 'or'??

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
##########
@@ -94,9 +105,81 @@ public AmazonS3 createS3Client(
       awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
     }
 
-    return buildAmazonS3Client(
-        awsConf,
-        parameters);
+    if (conf.get(CLIENT_SIDE_ENCRYPTION_METHOD) == null) {
+      return buildAmazonS3Client(
+          awsConf,
+          parameters);
+    } else {
+      return newAmazonS3EncryptionClient(
+          awsConf,
+          parameters);
+    }
+  }
+
+  /**
+   * Create an {@link AmazonS3} client of type
+   * {@link AmazonS3EncryptionV2} if CSE is enabled.
+   *
+   * @param awsConf    AWS configuration.
+   * @param parameters parameters
+   *
+   * @return new AmazonS3 client.
+   */
+  protected AmazonS3 newAmazonS3EncryptionClient(
+      final ClientConfiguration awsConf,
+      final S3ClientCreationParameters parameters){
+
+    AmazonS3 client;
+    AmazonS3EncryptionClientV2Builder builder =
+        new AmazonS3EncryptionClientV2Builder();
+    Configuration conf = getConf();
+
+    //CSE-KMS Method
+    String kmsKeyId = conf.get(CLIENT_SIDE_ENCRYPTION_KMS_KEY_ID);
+    // Check if kmsKeyID is not null
+    Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
+        + "requires KMS key ID. Use " + CLIENT_SIDE_ENCRYPTION_KMS_KEY_ID
+        + " property to set it. ");
+
+    EncryptionMaterialsProvider materialsProvider =
+        new KMSEncryptionMaterialsProvider(kmsKeyId);
+
+    builder.withEncryptionMaterialsProvider(materialsProvider);
+    builder.withCredentials(parameters.getCredentialSet())
+        .withClientConfiguration(awsConf)
+        .withPathStyleAccessEnabled(parameters.isPathStyleAccess());
+
+    // if metrics are not null, then add in the builder.
+    if (parameters.getMetrics() != null) {
+      LOG.debug("Creating Amazon client with AWS metrics");
+      builder.withMetricsCollector(
+          new AwsStatisticsCollector(parameters.getMetrics()));
+    }
+
+    // Create cryptoConfig
+    CryptoConfigurationV2 cryptoConfigurationV2 =
+        new CryptoConfigurationV2(CryptoMode.AuthenticatedEncryption)
+            .withRangeGetMode(CryptoRangeGetMode.ALL);
+
+    // Setting the endpoint and KMS region in cryptoConfig
+    AmazonS3EncryptionClientV2Builder.EndpointConfiguration epr
+        = createEndpointConfiguration(parameters.getEndpoint(), awsConf, getConf().getTrimmed(AWS_REGION));
+    if (epr != null) {
+      LOG.debug(
+          "Building the AmazonS3 Encryption client with endpoint configs");
+      builder.withEndpointConfiguration(epr);
+      cryptoConfigurationV2
+          .withAwsKmsRegion(RegionUtils.getRegion(epr.getSigningRegion()));
+      LOG.debug("KMS region used: {}",
+          cryptoConfigurationV2.getAwsKmsRegion());
+    } else {
+      // forcefully look for the region; extra HEAD call required.
+      builder.setForceGlobalBucketAccessEnabled(true);
+    }
+    builder.withCryptoConfiguration(cryptoConfigurationV2);

Review comment:
       Yes agree with refactoring common methods by steve.

##########
File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
+
+/**
+ * Tests to verify S3 Client-Side Encryption (CSE).
+ */
+public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
+
+  private static final List<Integer> SIZES =
+      new ArrayList<>(Arrays.asList(0, 1, 255, 4095));
+
+  private static final int BIG_FILE_SIZE = 15 * 1024 * 1024;
+
+  /**
+   * Testing S3 CSE on different file sizes.
+   */
+  @Test
+  public void testEncryption() throws Throwable {
+    describe("Test to verify client-side encryption for different file sizes.");
+    for (int size : SIZES) {
+      validateEncryptionForFilesize(size);
+    }
+  }
+
+  /**
+   * Testing the S3 client side encryption over rename operation.
+   */
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    describe("Test for AWS CSE on Rename Operation.");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path src = path(getMethodName());
+    byte[] data = dataset(1024, 'a', 'z');
+    LOG.info("Region used: {}", fs.getAmazonS3Client().getRegionName());
+    writeDataset(fs, src, data, data.length, 1024 * 1024,
+        true, false);
+
+    //ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  /**
+   * Test to verify if we get same content length of files in S3 CSE using
+   * listStatus and listFiles on the parent directory.
+   */
+  @Test
+  public void testDirectoryListingFileLengths() throws IOException {
+    describe("Test to verify directory listing calls gives correct content "
+        + "lengths");
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path parentDir = path(getMethodName());
+
+    // Creating files in the parent directory that will be used to assert
+    // content length.
+    for (int i : SIZES) {
+      Path child = new Path(parentDir, getMethodName() + i);
+      writeThenReadFile(child, i);
+    }
+
+    // Getting the content lengths of files inside the directory via FileStatus.
+    List<Integer> fileLengthDirListing = new ArrayList<>();
+    for (FileStatus fileStatus : fs.listStatus(parentDir)) {
+      fileLengthDirListing.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // ListStatus.
+    Assertions.assertThat(fileLengthDirListing)
+        .describedAs("File lengths isn't same "
+            + "as expected from FileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+
+    // Getting the content lengths of files inside the directory via ListFiles.
+    RemoteIterator<LocatedFileStatus> listDir = fs.listFiles(parentDir, true);
+    List<Integer> fileLengthListLocated = new ArrayList<>();
+    while(listDir.hasNext()) {
+      LocatedFileStatus fileStatus = listDir.next();
+      fileLengthListLocated.add((int) fileStatus.getLen());
+    }
+    // Assert the file length we got against expected file length for
+    // LocatedFileStatus.
+    Assertions.assertThat(fileLengthListLocated)
+        .describedAs("File lengths isn't same "
+            + "as expected from LocatedFileStatus dir. listing")
+        .containsExactlyInAnyOrderElementsOf(SIZES);
+
+  }
+
+  /**
+   * Test to verify multipart upload through S3ABlockOutputStream and
+   * verifying the contents of the uploaded file.
+   */
+  @Test
+  public void testBigFilePutAndGet() throws IOException {
+    skipTest();
+    S3AFileSystem fs = getFileSystem();
+    Path filePath = path(getMethodName());
+    byte[] fileContent = new byte[BIG_FILE_SIZE];
+    // PUT a 15MB file using CSE to simulate multipart in CSE.
+    createFile(fs, filePath, true, fileContent);
+    LOG.info("Multi-part upload successful...");
+
+    try(FSDataInputStream in = fs.open(filePath)) {
+      in.seek(BIG_FILE_SIZE);
+      in.seek(0);
+      in.readFully(0, fileContent);
+
+      verifyFileContents(fs, filePath, fileContent);
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+
+    // To simulate multi part put and get in small files, we'll set the
+    // threshold and part size to 5MB.
+    conf.set(Constants.MULTIPART_SIZE, String.valueOf(MULTIPART_MIN_SIZE));
+    conf.set(Constants.MIN_MULTIPART_THRESHOLD, String.valueOf(MULTIPART_MIN_SIZE));
+    return conf;
+  }
+
+  /**
+   * Method to validate CSE for different file sizes.
+   *
+   * @param len length of the file.
+   */
+  protected void validateEncryptionForFilesize(int len) throws IOException {
+    skipTest();
+    describe("Create an encrypted file of size " + len);
+    // Creating a unique path by adding file length in file name.
+    Path path = writeThenReadFile(getMethodName() + len, len);
+    assertEncrypted(path);
+    rm(getFileSystem(), path, false, false);

Review comment:
       Do we have to explicitly do this? Won't this be done as a part of test cleanup and teardown?

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
##########
@@ -90,16 +90,18 @@
 public class Listing extends AbstractStoreOperation {
 
   private static final Logger LOG = S3AFileSystem.LOG;
+  private final boolean isCSEEnabled;
 
   static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
       new AcceptAllButS3nDirs();
 
   private final ListingOperationCallbacks listingOperationCallbacks;
 
   public Listing(ListingOperationCallbacks listingOperationCallbacks,
-                 StoreContext storeContext) {
+                 StoreContext storeContext, boolean isCSEEnabled) {

Review comment:
       Move isCSEEnabled to storeContext?

##########
File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
##########
@@ -94,9 +105,81 @@ public AmazonS3 createS3Client(
       awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
     }
 
-    return buildAmazonS3Client(
-        awsConf,
-        parameters);
+    if (conf.get(CLIENT_SIDE_ENCRYPTION_METHOD) == null) {
+      return buildAmazonS3Client(
+          awsConf,
+          parameters);
+    } else {
+      return newAmazonS3EncryptionClient(

Review comment:
       nit: buildAmazonS3EncryptionClient




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org