You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/11/18 07:11:54 UTC

[incubator-pinot] branch master updated: Set S3 Bucket ACL policy from config (#6272)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f40c2ac  Set S3 Bucket ACL policy from config (#6272)
f40c2ac is described below

commit f40c2aca7ecbf3b9112a7bf2fff7c0c5b75247be
Author: Tanmay Krishna <ta...@razorpay.com>
AuthorDate: Wed Nov 18 12:34:00 2020 +0530

    Set S3 Bucket ACL policy from config (#6272)
    
    * [pinot-s3] Set acl based on config
    
    * review changes
---
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 89 +++++++++++++++++-----
 1 file changed, 68 insertions(+), 21 deletions(-)

diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index b07b85d..fff5a2c 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.plugin.filesystem;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,16 +33,11 @@ import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -61,28 +58,35 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.MetadataDirective;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 
+/**
+ * Implementation of PinotFS for AWS S3 file system
+ */
 public class S3PinotFS extends PinotFS {
   public static final String ACCESS_KEY = "accessKey";
   public static final String SECRET_KEY = "secretKey";
   public static final String REGION = "region";
   public static final String ENDPOINT = "endpoint";
+  public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(S3PinotFS.class);
   private static final String DELIMITER = "/";
   public static final String S3_SCHEME = "s3://";
+  private static boolean DEFAULT_DISABLE_ACL = true;
   private S3Client _s3Client;
+  private boolean disableAcl = DEFAULT_DISABLE_ACL;
 
   @Override
   public void init(PinotConfiguration config) {
     Preconditions.checkArgument(!isNullOrEmpty(config.getProperty(REGION)));
     String region = config.getProperty(REGION);
-
+    disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, DEFAULT_DISABLE_ACL);
     AwsCredentialsProvider awsCredentialsProvider;
     try {
 
@@ -170,6 +174,12 @@ public class S3PinotFS extends PinotFS {
     }
   }
 
+  /**
+   * Determines if the file exists at the given path
+   * @param uri file path
+   * @return {@code true} if the file exists in the path
+   *         {@code false} otherwise
+   */
   private boolean existsFile(URI uri)
       throws IOException {
     try {
@@ -186,6 +196,12 @@ public class S3PinotFS extends PinotFS {
     }
   }
 
+  /**
+   * Determines if a path is a directory that is not empty
+   * @param uri The path under the S3 bucket
+   * @return {@code true} if the path is a non-empty directory,
+   *         {@code false} otherwise
+   */
   private boolean isEmptyDirectory(URI uri)
       throws IOException {
     if (!isDirectory(uri)) {
@@ -214,6 +230,13 @@ public class S3PinotFS extends PinotFS {
     return isEmpty;
   }
 
+  /**
+   * Method to copy file from source to destination.
+   * @param srcUri source path
+   * @param dstUri destination path
+   * @return {@code true} if the copy operation succeeds, i.e., response code is 200
+   *         {@code false} otherwise
+   */
   private boolean copyFile(URI srcUri, URI dstUri)
       throws IOException {
     try {
@@ -225,10 +248,14 @@ public class S3PinotFS extends PinotFS {
       }
 
       String dstPath = sanitizePath(dstUri.getPath());
-      CopyObjectRequest copyReq =
-          CopyObjectRequest.builder().copySource(encodedUrl).destinationBucket(dstUri.getHost()).destinationKey(dstPath)
-              .build();
+      CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(encodedUrl)
+          .destinationBucket(dstUri.getHost()).destinationKey(dstPath);
+
+      if (!disableAcl) {
+        copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+      }
 
+      CopyObjectRequest copyReq = copyReqBuilder.build();
       CopyObjectResponse copyObjectResponse = _s3Client.copyObject(copyReq);
       return copyObjectResponse.sdkHttpResponse().isSuccessful();
     } catch (S3Exception e) {
@@ -248,8 +275,13 @@ public class S3PinotFS extends PinotFS {
         return true;
       }
 
-      PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(uri.getHost()).key(path).build();
+      PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path);
+
+      if (!disableAcl) {
+        putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+      }
 
+      PutObjectRequest putObjectRequest = putReqBuilder.build();
       PutObjectResponse putObjectResponse = _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
 
       return putObjectResponse.sdkHttpResponse().isSuccessful();
@@ -336,7 +368,7 @@ public class S3PinotFS extends PinotFS {
       boolean copySucceeded = true;
       for (String filePath : listFiles(srcUri, true)) {
         URI srcFileURI = URI.create(filePath);
-        String directoryEntryPrefix =  srcFileURI.getPath();
+        String directoryEntryPrefix = srcFileURI.getPath();
         URI src = new URI(srcUri.getScheme(), srcUri.getHost(), directoryEntryPrefix, null);
         String relativeSrcPath = srcPath.relativize(Paths.get(directoryEntryPrefix)).toString();
         String dstPath = dstUri.resolve(relativeSrcPath).getPath();
@@ -389,7 +421,7 @@ public class S3PinotFS extends PinotFS {
       String continuationToken = null;
       boolean isDone = false;
       String prefix = normalizeToDirectoryPrefix(fileUri);
-      while(!isDone) {
+      while (!isDone) {
         ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
             ListObjectsV2Request.builder().bucket(fileUri.getHost());
         if (!prefix.equals(DELIMITER)) {
@@ -445,8 +477,13 @@ public class S3PinotFS extends PinotFS {
     LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
     URI base = getBase(dstUri);
     String prefix = sanitizePath(base.relativize(dstUri).getPath());
-    PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix).build();
+    PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix);
 
+    if (!disableAcl) {
+      putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+    }
+
+    PutObjectRequest putObjectRequest = putReqBuilder.build();
     _s3Client.putObject(putObjectRequest, srcFile.toPath());
   }
 
@@ -460,8 +497,8 @@ public class S3PinotFS extends PinotFS {
       }
 
       ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request
-              .builder().bucket(uri.getHost())
-              .prefix(prefix).maxKeys(2).build();
+          .builder().bucket(uri.getHost())
+          .prefix(prefix).maxKeys(2).build();
       ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
       return listObjectsV2Response.hasContents();
     } catch (NoSuchKeyException e) {
@@ -491,17 +528,28 @@ public class S3PinotFS extends PinotFS {
       String path = sanitizePath(uri.getPath());
       Map<String, String> mp = new HashMap<>();
       mp.put("lastModified", String.valueOf(System.currentTimeMillis()));
-      CopyObjectRequest request =
-          CopyObjectRequest.builder().copySource(encodedUrl).destinationBucket(uri.getHost()).destinationKey(path)
-              .metadata(mp).metadataDirective(MetadataDirective.REPLACE).build();
+      CopyObjectRequest.Builder copyReqBuilder = CopyObjectRequest.builder().copySource(encodedUrl)
+          .destinationBucket(uri.getHost()).destinationKey(path)
+          .metadata(mp).metadataDirective(MetadataDirective.REPLACE);
 
+      if (!disableAcl) {
+        copyReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+      }
+
+      CopyObjectRequest request = copyReqBuilder.build();
       _s3Client.copyObject(request);
       long newUpdateTime = getS3ObjectMetadata(uri).lastModified().toEpochMilli();
       return newUpdateTime > s3ObjectMetadata.lastModified().toEpochMilli();
     } catch (NoSuchKeyException e) {
       String path = sanitizePath(uri.getPath());
-      _s3Client.putObject(PutObjectRequest.builder().bucket(uri.getHost()).key(path).build(),
-          RequestBody.fromBytes(new byte[0]));
+      PutObjectRequest.Builder putReqBuilder = PutObjectRequest.builder().bucket(uri.getHost()).key(path);
+
+      if (!disableAcl) {
+        putReqBuilder.acl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
+      }
+
+      PutObjectRequest putObjectRequest = putReqBuilder.build();
+      _s3Client.putObject(putObjectRequest, RequestBody.fromBytes(new byte[0]));
       return true;
     } catch (S3Exception e) {
       throw new IOException(e);
@@ -526,5 +574,4 @@ public class S3PinotFS extends PinotFS {
       throws IOException {
     super.close();
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org