You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/09 23:13:53 UTC

[incubator-pinot] branch s3-kms created (now 9561db0)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch s3-kms
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 9561db0  Support S3 with server side encryption mode aws:kms

This branch includes the following new commits:

     new 9561db0  Support S3 with server side encryption mode aws:kms

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Support S3 with server side encryption mode aws:kms

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch s3-kms
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 9561db08ce766f1b32d6c7521629fb58b1a4fa65
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Feb 9 15:12:19 2021 -0800

    Support S3 with server side encryption mode aws:kms
---
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 132 +++++++++++++--------
 1 file changed, 82 insertions(+), 50 deletions(-)

diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index fff5a2c..55873ec 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.filesystem;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,7 +31,7 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
@@ -63,6 +64,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
 
 
 /**
@@ -74,22 +76,52 @@ public class S3PinotFS extends PinotFS {
   public static final String REGION = "region";
   public static final String ENDPOINT = "endpoint";
   public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
+  public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY = "serverSideEncryption";
+  public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId";
+  public static final String SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY = "ssekmsEncryptionContext";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(S3PinotFS.class);
   private static final String DELIMITER = "/";
   public static final String S3_SCHEME = "s3://";
-  private static boolean DEFAULT_DISABLE_ACL = true;
+  private static final boolean DEFAULT_DISABLE_ACL = true;
   private S3Client _s3Client;
-  private boolean disableAcl = DEFAULT_DISABLE_ACL;
+  private boolean _disableAcl = DEFAULT_DISABLE_ACL;
+  private ServerSideEncryption _serverSideEncryption = null;
+  private String _ssekmsKeyId;
+  private String _ssekmsEncryptionContext;
 
   @Override
   public void init(PinotConfiguration config) {
     Preconditions.checkArgument(!isNullOrEmpty(config.getProperty(REGION)));
     String region = config.getProperty(REGION);
-    disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL);
+    _disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL);
+    String serverSideEncryption = config.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
+    if (serverSideEncryption != null) {
+      try {
+        _serverSideEncryption = ServerSideEncryption.valueOf(serverSideEncryption);
+      } catch (Exception e) {
+        throw new UnsupportedOperationException(String
+            .format("Unknown value '%s' for S3PinotFS config: 'serverSideEncryption'. Supported values are: %s",
+                serverSideEncryption, Arrays.toString(ServerSideEncryption.knownValues().toArray())));
+      }
+      switch (_serverSideEncryption) {
+        case AWS_KMS:
+          _ssekmsKeyId = config.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
+          if (_ssekmsKeyId == null) {
+            throw new UnsupportedOperationException(
+                "Missing required config: 'sseKmsKeyId' when AWS_KMS is used for server side encryption");
+          }
+          _ssekmsEncryptionContext = config.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
+          break;
+        case AES256:
+          // Todo: Support AES256.
+        default:
+          throw new UnsupportedOperationException("Unsupported server side encryption: " + _serverSideEncryption);
+      }
+    }
     AwsCredentialsProvider awsCredentialsProvider;
-    try {
 
+    try {
       if (!isNullOrEmpty(config.getProperty(ACCESS_KEY)) && !isNullOrEmpty(config.getProperty(SECRET_KEY))) {
         String accessKey = config.getProperty(ACCESS_KEY);
         String secretKey = config.getProperty(SECRET_KEY);
@@ -99,7 +131,8 @@ public class S3PinotFS extends PinotFS {
         awsCredentialsProvider = DefaultCredentialsProvider.create();
       }
 
-      S3ClientBuilder s3ClientBuilder = S3Client.builder().region(Region.of(region)).credentialsProvider(awsCredentialsProvider);
+      S3ClientBuilder s3ClientBuilder =
+          S3Client.builder().region(Region.of(region)).credentialsProvider(awsCredentialsProvider);
       if (!isNullOrEmpty(config.getProperty(ENDPOINT))) {
         String endpoint = config.getProperty(ENDPOINT);
         try {
@@ -248,14 +281,7 @@ public class S3PinotFS extends PinotFS {
       }
 
       String dstPath = sanitizePath(dstUri.getPath());
-      CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(encodedUrl)
-          .destinationBucket(dstUri.getHost()).destinationKey(dstPath);
-
-      if (!disableAcl) {
-        copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-      }
-
-      CopyObjectRequest copyReq = copyReqBuilder.build();
+      CopyObjectRequest copyReq = generateCopyObjectRequest(encodedUrl, dstUri, dstPath, null);
       CopyObjectResponse copyObjectResponse = _s3Client.copyObject(copyReq);
       return copyObjectResponse.sdkHttpResponse().isSuccessful();
     } catch (S3Exception e) {
@@ -275,15 +301,8 @@ public class S3PinotFS extends PinotFS {
         return true;
       }
 
-      PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path);
-
-      if (!disableAcl) {
-        putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-      }
-
-      PutObjectRequest putObjectRequest = putReqBuilder.build();
+      PutObjectRequest putObjectRequest = generatePutObjectRequest(uri, path);
       PutObjectResponse putObjectResponse = _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
-
       return putObjectResponse.sdkHttpResponse().isSuccessful();
     } catch (Throwable t) {
       throw new IOException(t);
@@ -477,13 +496,7 @@ public class S3PinotFS extends PinotFS {
     LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
     URI base = getBase(dstUri);
     String prefix = sanitizePath(base.relativize(dstUri).getPath());
-    PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix);
-
-    if (!disableAcl) {
-      putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-    }
-
-    PutObjectRequest putObjectRequest = putReqBuilder.build();
+    PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, prefix);
     _s3Client.putObject(putObjectRequest, srcFile.toPath());
   }
 
@@ -496,9 +509,8 @@ public class S3PinotFS extends PinotFS {
         return true;
       }
 
-      ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request
-          .builder().bucket(uri.getHost())
-          .prefix(prefix).maxKeys(2).build();
+      ListObjectsV2Request listObjectsV2Request =
+          ListObjectsV2Request.builder().bucket(uri.getHost()).prefix(prefix).maxKeys(2).build();
       ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
       return listObjectsV2Response.hasContents();
     } catch (NoSuchKeyException e) {
@@ -526,29 +538,14 @@ public class S3PinotFS extends PinotFS {
       }
 
       String path = sanitizePath(uri.getPath());
-      Map<String, String> mp = new HashMap<>();
-      mp.put("lastModified", String.valueOf(System.currentTimeMillis()));
-      CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(encodedUrl)
-          .destinationBucket(uri.getHost()).destinationKey(path)
-          .metadata(mp).metadataDirective(MetadataDirective.REPLACE);
-
-      if (!disableAcl) {
-        copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-      }
-
-      CopyObjectRequest request = copyReqBuilder.build();
+      CopyObjectRequest request = generateCopyObjectRequest(encodedUrl, uri, path,
+          ImmutableMap.of("lastModified", String.valueOf(System.currentTimeMillis())));
       _s3Client.copyObject(request);
       long newUpdateTime = getS3ObjectMetadata(uri).lastModified().toEpochMilli();
       return newUpdateTime > s3ObjectMetadata.lastModified().toEpochMilli();
     } catch (NoSuchKeyException e) {
       String path = sanitizePath(uri.getPath());
-      PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path);
-
-      if (!disableAcl) {
-        putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
-      }
-
-      PutObjectRequest putObjectRequest = putReqBuilder.build();
+      PutObjectRequest putObjectRequest = generatePutObjectRequest(uri, path);
       _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
       return true;
     } catch (S3Exception e) {
@@ -556,6 +553,41 @@ public class S3PinotFS extends PinotFS {
     }
   }
 
+  private PutObjectRequest generatePutObjectRequest(URI uri, String path) {
+    PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path);
+
+    if (!_disableAcl) {
+      putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+    }
+
+    if (_serverSideEncryption != null) {
+      putReqBuilder.serverSideEncryption(_serverSideEncryption).ssekmsKeyId(_ssekmsKeyId);
+      if (_ssekmsEncryptionContext != null) {
+        putReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext);
+      }
+    }
+    return putReqBuilder.build();
+  }
+
+  private CopyObjectRequest generateCopyObjectRequest(String copySource, URI dest, String path,
+      Map<String, String> metadata) {
+    CopyObjectRequest.Builder copyReqBuilder =
+        CopyObjectRequest.builder().copySource(copySource).destinationBucket(dest.getHost()).destinationKey(path);
+    if (metadata != null) {
+      copyReqBuilder.metadata(metadata).metadataDirective(MetadataDirective.REPLACE);
+    }
+    if (!_disableAcl) {
+      copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+    }
+    if (_serverSideEncryption != null) {
+      copyReqBuilder.serverSideEncryption(_serverSideEncryption).ssekmsKeyId(_ssekmsKeyId);
+      if (_ssekmsEncryptionContext != null) {
+        copyReqBuilder.ssekmsEncryptionContext(_ssekmsEncryptionContext);
+      }
+    }
+    return copyReqBuilder.build();
+  }
+
   @Override
   public InputStream open(URI uri)
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org