You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/12/14 06:55:30 UTC
[pinot] branch master updated: Ability to initialize S3PinotFs with serverSideEncryption properties when passing client directly (#9988)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2d0cbc19de Ability to initialize S3PinotFs with serverSideEncryption properties when passing client directly (#9988)
2d0cbc19de is described below
commit 2d0cbc19dec47d3aa9228abd85ffba3a9abe821a
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue Dec 13 22:55:24 2022 -0800
Ability to initialize S3PinotFs with serverSideEncryption properties when passing client directly (#9988)
* Initialize S3PinotFs serverSideEncryption properties when using init method that takes S3Client directly
* Remve unused
---
.../apache/pinot/plugin/filesystem/S3PinotFS.java | 79 ++++++++++++++--------
1 file changed, 50 insertions(+), 29 deletions(-)
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 511f0c50b1..99cf60a89c 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -35,6 +35,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
@@ -98,31 +99,9 @@ public class S3PinotFS extends BasePinotFS {
String region = config.getProperty(REGION);
_disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL);
String serverSideEncryption = config.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
- if (serverSideEncryption != null) {
- try {
- _serverSideEncryption = ServerSideEncryption.valueOf(serverSideEncryption);
- } catch (Exception e) {
- throw new UnsupportedOperationException(String
- .format("Unknown value '%s' for S3PinotFS config: 'serverSideEncryption'. Supported values are: %s",
- serverSideEncryption, Arrays.toString(ServerSideEncryption.knownValues().toArray())));
- }
- switch (_serverSideEncryption) {
- case AWS_KMS:
- _ssekmsKeyId = config.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
- if (_ssekmsKeyId == null) {
- throw new UnsupportedOperationException(
- "Missing required config: 'sseKmsKeyId' when AWS_KMS is used for server side encryption");
- }
- _ssekmsEncryptionContext = config.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
- break;
- case AES256:
- // Todo: Support AES256.
- default:
- throw new UnsupportedOperationException("Unsupported server side encryption: " + _serverSideEncryption);
- }
- }
- AwsCredentialsProvider awsCredentialsProvider;
+ setServerSideEncryption(serverSideEncryption, config);
+ AwsCredentialsProvider awsCredentialsProvider;
try {
if (!isNullOrEmpty(config.getProperty(ACCESS_KEY)) && !isNullOrEmpty(config.getProperty(SECRET_KEY))) {
String accessKey = config.getProperty(ACCESS_KEY);
@@ -149,10 +128,53 @@ public class S3PinotFS extends BasePinotFS {
}
}
+ /**
+ * Initialized the _s3Client directly with provided client.
+ * This initialization method will not initialize the server side encryption
+ * @param s3Client s3Client to initialize with
+ */
public void init(S3Client s3Client) {
_s3Client = s3Client;
}
+ /**
+ * Initialize the _s3Client directly with provided client, along with additional server side encryption related props
+ * @param s3Client s3Client to initialize with
+ * @param serverSideEncryption the server side encryption string e.g. AWS_KMS is the only supported on as of now
+ * @param serverSideEncryptionConfig properties specific to provided server side encryption type
+ */
+ public void init(S3Client s3Client, String serverSideEncryption, PinotConfiguration serverSideEncryptionConfig) {
+ _s3Client = s3Client;
+ setServerSideEncryption(serverSideEncryption, serverSideEncryptionConfig);
+ }
+
+ private void setServerSideEncryption(@Nullable String serverSideEncryption,
+ PinotConfiguration serverSideEncryptionConfig) {
+ if (serverSideEncryption != null) {
+ try {
+ _serverSideEncryption = ServerSideEncryption.valueOf(serverSideEncryption);
+ } catch (Exception e) {
+ throw new UnsupportedOperationException(
+ String.format("Unknown value '%s' for S3PinotFS config: 'serverSideEncryption'. Supported values are: %s",
+ serverSideEncryption, Arrays.toString(ServerSideEncryption.knownValues().toArray())));
+ }
+ switch (_serverSideEncryption) {
+ case AWS_KMS:
+ _ssekmsKeyId = serverSideEncryptionConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
+ if (_ssekmsKeyId == null) {
+ throw new UnsupportedOperationException(
+ "Missing required config: 'sseKmsKeyId' when AWS_KMS is used for server side encryption");
+ }
+ _ssekmsEncryptionContext = serverSideEncryptionConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
+ break;
+ case AES256:
+ // Todo: Support AES256.
+ default:
+ throw new UnsupportedOperationException("Unsupported server side encryption: " + _serverSideEncryption);
+ }
+ }
+ }
+
boolean isNullOrEmpty(String target) {
return target == null || target.isEmpty();
}
@@ -318,9 +340,8 @@ public class S3PinotFS extends BasePinotFS {
try {
if (isDirectory(segmentUri)) {
if (!forceDelete) {
- Preconditions
- .checkState(isEmptyDirectory(segmentUri), "ForceDelete flag is not set and directory '%s' is not empty",
- segmentUri);
+ Preconditions.checkState(isEmptyDirectory(segmentUri),
+ "ForceDelete flag is not set and directory '%s' is not empty", segmentUri);
}
String prefix = normalizeToDirectoryPrefix(segmentUri);
ListObjectsV2Response listObjectsV2Response;
@@ -456,8 +477,8 @@ public class S3PinotFS extends BasePinotFS {
ImmutableList.Builder<FileMetadata> listBuilder = ImmutableList.builder();
visitFiles(fileUri, recursive, s3Object -> {
if (!s3Object.key().equals(fileUri.getPath())) {
- FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
- .setFilePath(S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object))
+ FileMetadata.Builder fileBuilder = new FileMetadata.Builder().setFilePath(
+ S3_SCHEME + fileUri.getHost() + DELIMITER + getNormalizedFileKey(s3Object))
.setLastModifiedTime(s3Object.lastModified().toEpochMilli()).setLength(s3Object.size())
.setIsDirectory(s3Object.key().endsWith(DELIMITER));
listBuilder.add(fileBuilder.build());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org