You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/12/14 06:55:30 UTC

[pinot] branch master updated: Ability to initialize S3PinotFs with serverSideEncryption properties when passing client directly (#9988)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d0cbc19de Ability to initialize S3PinotFs with serverSideEncryption properties when passing client directly (#9988)
2d0cbc19de is described below

commit 2d0cbc19dec47d3aa9228abd85ffba3a9abe821a
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue Dec 13 22:55:24 2022 -0800

    Ability to initialize S3PinotFs with serverSideEncryption properties when passing client directly (#9988)
    
    * Initialize S3PinotFs serverSideEncryption properties when using init method that takes S3Client directly
    
    * Remve unused
---
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 79 ++++++++++++++--------
 1 file changed, 50 insertions(+), 29 deletions(-)

diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 511f0c50b1..99cf60a89c 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -35,6 +35,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.BasePinotFS;
@@ -98,31 +99,9 @@ public class S3PinotFS extends BasePinotFS {
     String region = config.getProperty(REGION);
     _disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL);
     String serverSideEncryption = config.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
-    if (serverSideEncryption != null) {
-      try {
-        _serverSideEncryption = ServerSideEncryption.valueOf(serverSideEncryption);
-      } catch (Exception e) {
-        throw new UnsupportedOperationException(String
-            .format("Unknown value '%s' for S3PinotFS config: 'serverSideEncryption'. Supported values are: %s",
-                serverSideEncryption, Arrays.toString(ServerSideEncryption.knownValues().toArray())));
-      }
-      switch (_serverSideEncryption) {
-        case AWS_KMS:
-          _ssekmsKeyId = config.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
-          if (_ssekmsKeyId == null) {
-            throw new UnsupportedOperationException(
-                "Missing required config: 'sseKmsKeyId' when AWS_KMS is used for server side encryption");
-          }
-          _ssekmsEncryptionContext = config.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
-          break;
-        case AES256:
-          // Todo: Support AES256.
-        default:
-          throw new UnsupportedOperationException("Unsupported server side encryption: " + _serverSideEncryption);
-      }
-    }
-    AwsCredentialsProvider awsCredentialsProvider;
+    setServerSideEncryption(serverSideEncryption, config);
 
+    AwsCredentialsProvider awsCredentialsProvider;
     try {
       if (!isNullOrEmpty(config.getProperty(ACCESS_KEY)) && !isNullOrEmpty(config.getProperty(SECRET_KEY))) {
         String accessKey = config.getProperty(ACCESS_KEY);
@@ -149,10 +128,53 @@ public class S3PinotFS extends BasePinotFS {
     }
   }
 
+  /**
+   * Initialized the _s3Client directly with provided client.
+   * This initialization method will not initialize the server side encryption
+   * @param s3Client s3Client to initialize with
+   */
   public void init(S3Client s3Client) {
     _s3Client = s3Client;
   }
 
+  /**
+   * Initialize the _s3Client directly with provided client, along with additional server side encryption related props
+   * @param s3Client s3Client to initialize with
+   * @param serverSideEncryption the server side encryption string e.g. AWS_KMS is the only supported on as of now
+   * @param serverSideEncryptionConfig properties specific to provided server side encryption type
+   */
+  public void init(S3Client s3Client, String serverSideEncryption, PinotConfiguration serverSideEncryptionConfig) {
+    _s3Client = s3Client;
+    setServerSideEncryption(serverSideEncryption, serverSideEncryptionConfig);
+  }
+
+  private void setServerSideEncryption(@Nullable String serverSideEncryption,
+      PinotConfiguration serverSideEncryptionConfig) {
+    if (serverSideEncryption != null) {
+      try {
+        _serverSideEncryption = ServerSideEncryption.valueOf(serverSideEncryption);
+      } catch (Exception e) {
+        throw new UnsupportedOperationException(
+            String.format("Unknown value '%s' for S3PinotFS config: 'serverSideEncryption'. Supported values are: %s",
+                serverSideEncryption, Arrays.toString(ServerSideEncryption.knownValues().toArray())));
+      }
+      switch (_serverSideEncryption) {
+        case AWS_KMS:
+          _ssekmsKeyId = serverSideEncryptionConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
+          if (_ssekmsKeyId == null) {
+            throw new UnsupportedOperationException(
+                "Missing required config: 'sseKmsKeyId' when AWS_KMS is used for server side encryption");
+          }
+          _ssekmsEncryptionContext = serverSideEncryptionConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
+          break;
+        case AES256:
+          // Todo: Support AES256.
+        default:
+          throw new UnsupportedOperationException("Unsupported server side encryption: " + _serverSideEncryption);
+      }
+    }
+  }
+
   boolean isNullOrEmpty(String target) {
     return target == null || target.isEmpty();
   }
@@ -318,9 +340,8 @@ public class S3PinotFS extends BasePinotFS {
     try {
       if (isDirectory(segmentUri)) {
         if (!forceDelete) {
-          Preconditions
-              .checkState(isEmptyDirectory(segmentUri), "ForceDelete flag is not set and directory '%s' is not empty",
-                  segmentUri);
+          Preconditions.checkState(isEmptyDirectory(segmentUri),
+              "ForceDelete flag is not set and directory '%s' is not empty", segmentUri);
         }
         String prefix = normalizeToDirectoryPrefix(segmentUri);
         ListObjectsV2Response listObjectsV2Response;
@@ -456,8 +477,8 @@ public class S3PinotFS extends BasePinotFS {
     ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
     visitFiles(fileUri, recursive, s3Object -> {
       if (!s3Object.key().equals(fileUri.getPath())) {
-        FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
-            .setFilePath(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object))
+        FileMetadata.Builder fileBuilder = new FileMetadata.Builder().setFilePath(
+                S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object))
             .setLastModifiedTime(s3Object.lastModified().toEpochMilli()).setLength(s3Object.size())
             .setIsDirectory(s3Object.key().endsWith(DELIMITER));
         listBuilder.add(fileBuilder.build());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org