You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/02/10 22:47:20 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10263: upload large s3 object by multi parts to overcome the 5GB limit of PutObject

Jackie-Jiang commented on code in PR #10263:
URL: https://github.com/apache/pinot/pull/10263#discussion_r1103324593


##########
pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java:
##########
@@ -91,7 +102,13 @@ public S3Config(PinotConfiguration pinotConfig) {
         Integer.parseInt(pinotConfig.getProperty(SESSION_DURATION_SECONDS, DEFAULT_SESSION_DURATION_SECONDS));
     _asyncSessionUpdateEnabled = Boolean.parseBoolean(
         pinotConfig.getProperty(ASYNC_SESSION_UPDATED_ENABLED, DEFAULT_ASYNC_SESSION_UPDATED_ENABLED));
-
+    // non-positive values to disable multipart upload.
+    _minObjectSizeForMultiPartUpload =
+        DataSizeUtils.toBytes(pinotConfig.getProperty(MIN_OBJECT_SIZE_FOR_MULTI_PART_UPLOAD, "-1"));
+    _multiPartUploadPartSize = DataSizeUtils.toBytes(

Review Comment:
   Should we validate this size to be larger than 5MB?



##########
pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java:
##########
@@ -49,6 +50,13 @@ public class S3Config {
   public static final String EXTERNAL_ID = "externalId";
   public static final String SESSION_DURATION_SECONDS = "sessionDurationSeconds";
   public static final String ASYNC_SESSION_UPDATED_ENABLED = "asyncSessionUpdateEnabled";
+  public static final String MIN_OBJECT_SIZE_FOR_MULTI_PART_UPLOAD = "minObjectSizeForMultiPartUpload";
+  public static final String MULTI_PART_UPLOAD_PART_SIZE = "multiPartUploadPartSize";
+  public static final String MULTI_PART_UPLOAD_MAX_PART_NUM_ALLOWED = "multiPartUploadMaxPartNumAllowed";

Review Comment:
   This seems like a limitation associated with S3, and I don't see the value of making it configurable



##########
pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java:
##########
@@ -551,11 +569,74 @@ public void copyToLocalFile(URI srcUri, File dstFile)
   @Override
   public void copyFromLocalFile(File srcFile, URI dstUri)
       throws Exception {
-    LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
-    URI base = getBase(dstUri);
-    String prefix = sanitizePath(base.relativize(dstUri).getPath());
-    PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, prefix);
-    _s3Client.putObject(putObjectRequest, srcFile.toPath());
+    if (_minObjectSizeToUploadInParts > 0 && srcFile.length() > _minObjectSizeToUploadInParts) {
+      LOGGER.info("Copy {} from local to {} in parts", srcFile.getAbsolutePath(), dstUri);
+      uploadFileInParts(srcFile, dstUri);
+    } else {
+      LOGGER.info("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
+      String prefix = sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+      PutObjectRequest putObjectRequest = generatePutObjectRequest(dstUri, prefix);
+      _s3Client.putObject(putObjectRequest, srcFile.toPath());
+    }
+  }
+
+  private void uploadFileInParts(File srcFile, URI dstUri)
+      throws Exception {
+    String bucket = dstUri.getHost();
+    String prefix = sanitizePath(getBase(dstUri).relativize(dstUri).getPath());
+    CreateMultipartUploadResponse multipartUpload =
+        _s3Client.createMultipartUpload(CreateMultipartUploadRequest.builder().bucket(bucket).key(prefix).build());
+    String uploadId = multipartUpload.uploadId();
+    // Upload parts sequentially to overcome the 5GB limit of a single PutObject call.
+    // TODO: parts can be uploaded in parallel for higher throughput, given a thread pool.
+    try (FileInputStream inputStream = FileUtils.openInputStream(srcFile)) {
+      long totalUploaded = 0;
+      long fileSize = srcFile.length();
+      // The part number must start from 1 and no more than the max part num allowed, 10000 by default.
+      // The default configs can upload a single file of 1TB, so the if-branch should rarely happen.
+      int partNum = 1;
+      long partSizeToUse = _multiPartUploadPartSize;
+      if (partSizeToUse * _multiPartUploadMaxPartNumAllowed < fileSize) {
+        partSizeToUse = (fileSize + _multiPartUploadPartSize) / _multiPartUploadMaxPartNumAllowed;

Review Comment:
   This seems not correct. Should this be `(fileSize + 9999) / 10000`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org