You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/11/18 07:11:54 UTC
[incubator-pinot] branch master updated: Set S3 Bucket ACL policy
from config (#6272)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f40c2ac Set S3 Bucket ACL policy from config (#6272)
f40c2ac is described below
commit f40c2aca7ecbf3b9112a7bf2fff7c0c5b75247be
Author: Tanmay Krishna <ta...@razorpay.com>
AuthorDate: Wed Nov 18 12:34:00 2020 +0530
Set S3 Bucket ACL policy from config (#6272)
* [pinot-s3] Set acl based on config
* review changes
---
.../apache/pinot/plugin/filesystem/S3PinotFS.java | 89 +++++++++++++++++-----
1 file changed, 68 insertions(+), 21 deletions(-)
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index b07b85d..fff5a2c 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.plugin.filesystem;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -31,16 +33,11 @@ import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -61,28 +58,35 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.MetadataDirective;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
+/**
+ * Implementation of PinotFS for AWS S3 file system
+ */
public class S3PinotFS extends PinotFS {
public static final String ACCESS_KEY = "accessKey";
public static final String SECRET_KEY = "secretKey";
public static final String REGION = "region";
public static final String ENDPOINT = "endpoint";
+ public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
private static final Logger LOGGER = LoggerFactory.getLogger(S3PinotFS.class);
private static final String DELIMITER = "/";
public static final String S3_SCHEME = "s3://";
+ private static boolean DEFAULT_DISABLE_ACL = true;
private S3Client _s3Client;
+ private boolean disableAcl = DEFAULT_DISABLE_ACL;
@Override
public void init(PinotConfiguration config) {
Preconditions.checkArgument(!isNullOrEmpty(config.getProperty(REGION)));
String region = config.getProperty(REGION);
-
+ disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL);
AwsCredentialsProvider awsCredentialsProvider;
try {
@@ -170,6 +174,12 @@ public class S3PinotFS extends PinotFS {
}
}
+ /**
+ * Determines if the file exists at the given path
+ * @param uri file path
+ * @return {@code true} if the file exists in the path
+ * {@code false} otherwise
+ */
private boolean existsFile(URI uri)
throws IOException {
try {
@@ -186,6 +196,12 @@ public class S3PinotFS extends PinotFS {
}
}
+ /**
+ * Determines if a path is a directory that is not empty
+ * @param uri The path under the S3 bucket
+ * @return {@code true} if the path is a non-empty directory,
+ * {@code false} otherwise
+ */
private boolean isEmptyDirectory(URI uri)
throws IOException {
if (!isDirectory(uri)) {
@@ -214,6 +230,13 @@ public class S3PinotFS extends PinotFS {
return isEmpty;
}
+ /**
+ * Method to copy file from source to destination.
+ * @param srcUri source path
+ * @param dstUri destination path
+ * @return {@code true} if the copy operation succeeds, i.e., response code is 200
+ * {@code false} otherwise
+ */
private boolean copyFile(URI srcUri, URI dstUri)
throws IOException {
try {
@@ -225,10 +248,14 @@ public class S3PinotFS extends PinotFS {
}
String dstPath = sanitizePath(dstUri.getPath());
- CopyObjectRequest copyReq =
- CopyObjectRequest.builder().copySource(encodedUrl).destinationBucket(dstUri.getHost()).destinationKey(dstPath)
- .build();
+ CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(encodedUrl)
+ .destinationBucket(dstUri.getHost()).destinationKey(dstPath);
+
+ if (!disableAcl) {
+ copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+ }
+ CopyObjectRequest copyReq = copyReqBuilder.build();
CopyObjectResponse copyObjectResponse = _s3Client.copyObject(copyReq);
return copyObjectResponse.sdkHttpResponse().isSuccessful();
} catch (S3Exception e) {
@@ -248,8 +275,13 @@ public class S3PinotFS extends PinotFS {
return true;
}
- PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(uri.getHost()).key(path).build();
+ PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path);
+
+ if (!disableAcl) {
+ putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+ }
+ PutObjectRequest putObjectRequest = putReqBuilder.build();
PutObjectResponse putObjectResponse = _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
return putObjectResponse.sdkHttpResponse().isSuccessful();
@@ -336,7 +368,7 @@ public class S3PinotFS extends PinotFS {
boolean copySucceeded = true;
for (String filePath : listFiles(srcUri, true)) {
URI srcFileURI = URI.create(filePath);
- String directoryEntryPrefix = srcFileURI.getPath();
+ String directoryEntryPrefix = srcFileURI.getPath();
URI src = new URI(srcUri.getScheme(), srcUri.getHost(), directoryEntryPrefix, null);
String relativeSrcPath = srcPath.relativize(Paths.get(directoryEntryPrefix)).toString();
String dstPath = dstUri.resolve(relativeSrcPath).getPath();
@@ -389,7 +421,7 @@ public class S3PinotFS extends PinotFS {
String continuationToken = null;
boolean isDone = false;
String prefix = normalizeToDirectoryPrefix(fileUri);
- while(!isDone) {
+ while (!isDone) {
ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
ListObjectsV2Request.builder().bucket(fileUri.getHost());
if (!prefix.equals(DELIMITER)) {
@@ -445,8 +477,13 @@ public class S3PinotFS extends PinotFS {
LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
URI base = getBase(dstUri);
String prefix = sanitizePath(base.relativize(dstUri).getPath());
- PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix).build();
+ PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix);
+ if (!disableAcl) {
+ putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+ }
+
+ PutObjectRequest putObjectRequest = putReqBuilder.build();
_s3Client.putObject(putObjectRequest, srcFile.toPath());
}
@@ -460,8 +497,8 @@ public class S3PinotFS extends PinotFS {
}
ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request
- .builder().bucket(uri.getHost())
- .prefix(prefix).maxKeys(2).build();
+ .builder().bucket(uri.getHost())
+ .prefix(prefix).maxKeys(2).build();
ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
return listObjectsV2Response.hasContents();
} catch (NoSuchKeyException e) {
@@ -491,17 +528,28 @@ public class S3PinotFS extends PinotFS {
String path = sanitizePath(uri.getPath());
Map<String, String> mp = new HashMap<>();
mp.put("lastModified", String.valueOf(System.currentTimeMillis()));
- CopyObjectRequest request =
- CopyObjectRequest.builder().copySource(encodedUrl).destinationBucket(uri.getHost()).destinationKey(path)
- .metadata(mp).metadataDirective(MetadataDirective.REPLACE).build();
+ CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(encodedUrl)
+ .destinationBucket(uri.getHost()).destinationKey(path)
+ .metadata(mp).metadataDirective(MetadataDirective.REPLACE);
+ if (!disableAcl) {
+ copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+ }
+
+ CopyObjectRequest request = copyReqBuilder.build();
_s3Client.copyObject(request);
long newUpdateTime = getS3ObjectMetadata(uri).lastModified().toEpochMilli();
return newUpdateTime > s3ObjectMetadata.lastModified().toEpochMilli();
} catch (NoSuchKeyException e) {
String path = sanitizePath(uri.getPath());
- _s3Client.putObject(PutObjectRequest.builder().bucket(uri.getHost()).key(path).build(),
- RequestBody.fromBytes(new byte[0]));
+ PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path);
+
+ if (!disableAcl) {
+ putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+ }
+
+ PutObjectRequest putObjectRequest = putReqBuilder.build();
+ _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
return true;
} catch (S3Exception e) {
throw new IOException(e);
@@ -526,5 +574,4 @@ public class S3PinotFS extends PinotFS {
throws IOException {
super.close();
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org