You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/06/09 20:56:33 UTC

[GitHub] [pinot] jackjlli commented on a diff in pull request #8753: Streamed segment download & untar with rate limiter to control disk usage `feature`

jackjlli commented on code in PR #8753:
URL: https://github.com/apache/pinot/pull/8753#discussion_r893930779


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java:
##########
@@ -1004,6 +1004,26 @@ public int downloadFile(URI uri, File dest, AuthProvider authProvider, List<Head
     return _httpClient.downloadFile(uri, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, dest, authProvider, httpHeaders);
   }
 
+  /**
+   * Download and untar a file in a streamed way with rate limit
+   *
+   * @param uri URI
+   * @param dest File destination
+   * @param authProvider auth token
+   * @param httpHeaders http headers
+   * @param rateLimit limit the rate to write download-untar stream to disk, in bytes
+   *                  -1 for no disk write limit, 0 for limit the writing to min(untar, download) rate
+   * @return Response status code
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public File downloadUntarFileStreamed(URI uri, File dest, AuthProvider authProvider, List<Header> httpHeaders,
+      long rateLimit)

Review Comment:
   Can we rename it sth more semantic like `maxDownloadRateInByte`?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +238,30 @@ public static void untarOneFile(File inputFile, String fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, FileOutputStream outputStream, long rateLimit)
+      throws IOException {
+    Objects.requireNonNull(inputStream, "inputStream is null");

Review Comment:
   How about reusing the `Preconditions` imported from Line 21, so that you don't need to import another one?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +238,30 @@ public static void untarOneFile(File inputFile, String fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, FileOutputStream outputStream, long rateLimit)
+      throws IOException {
+    Objects.requireNonNull(inputStream, "inputStream is null");
+    Objects.requireNonNull(outputStream, "outputStream is null");
+    FileDescriptor fd = outputStream.getFD();
+    LOGGER.info("Using rate limiter for stream copy, target limit {} bytes/s", rateLimit);
+    byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+    RateLimiter rateLimiter = RateLimiter.create(rateLimit);
+    long count;
+    int n;
+
+    if (rateLimit == SYNC_DISK_WRITE_WITH_UPSTREAM_RATE) {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+        outputStream.write(buffer, 0, n);
+        fd.sync(); // flush the buffer timely to the disk so that the disk bandwidth wouldn't get saturated
+      }
+    } else {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {

Review Comment:
   Probably instantiate the RateLimiter right before this for loop since it's only used inside the else block here?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.data.manager;
+
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+
+
+public class TableDataManagerParams {
+  private boolean _isStreamSegmentDownloadUntar;

Review Comment:
   It'd be good to put some comments to describe the meaning of all these params?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.data.manager;
+
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+
+
+public class TableDataManagerParams {
+  private boolean _isStreamSegmentDownloadUntar;
+  private long _streamSegmentDownloadUntarRateLimit;

Review Comment:
   Especially for this one. Is it based on `byte` or `ms`?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -447,6 +483,36 @@ File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File t
     }
   }
 
+  private File downloadAndStreamUntarRateLimit(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir,

Review Comment:
   sed 's/downloadAndStreamUntarRateLimit/downloadAndStreamUntarWithRateLimit/'



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +238,30 @@ public static void untarOneFile(File inputFile, String fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, FileOutputStream outputStream, long rateLimit)
+      throws IOException {
+    Objects.requireNonNull(inputStream, "inputStream is null");
+    Objects.requireNonNull(outputStream, "outputStream is null");
+    FileDescriptor fd = outputStream.getFD();
+    LOGGER.info("Using rate limiter for stream copy, target limit {} bytes/s", rateLimit);
+    byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+    RateLimiter rateLimiter = RateLimiter.create(rateLimit);
+    long count;
+    int n;
+
+    if (rateLimit == SYNC_DISK_WRITE_WITH_UPSTREAM_RATE) {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+        outputStream.write(buffer, 0, n);
+        fd.sync(); // flush the buffer timely to the disk so that the disk bandwidth wouldn't get saturated
+      }
+    } else {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+        rateLimiter.acquire(n);
+        outputStream.write(buffer, 0, n);
+        fd.sync();
+      }
+    }
+    return count;

Review Comment:
   Log one more message to show that the copy is done.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -130,6 +134,14 @@ public void init(TableDataManagerConfig tableDataManagerConfig, String instanceI
           _resourceTmpDir);
     }
     _errorCache = errorCache;
+    _streamSegmentDownloadUntarRateLimit = tableDataManagerParams.getStreamSegmentDownloadUntarRateLimit();
+    _isStreamSegmentDownloadUntar = tableDataManagerParams.isStreamSegmentDownloadUntar();
+    if (_isStreamSegmentDownloadUntar) {
+      LOGGER.info("Using streamed download-untar for segment download!");
+      LOGGER.info("The rate limit interval for streamed download-untar is {} ms",

Review Comment:
   Combine into one sentence?



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +238,30 @@ public static void untarOneFile(File inputFile, String fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, FileOutputStream outputStream, long rateLimit)
+      throws IOException {
+    Objects.requireNonNull(inputStream, "inputStream is null");
+    Objects.requireNonNull(outputStream, "outputStream is null");
+    FileDescriptor fd = outputStream.getFD();
+    LOGGER.info("Using rate limiter for stream copy, target limit {} bytes/s", rateLimit);
+    byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+    RateLimiter rateLimiter = RateLimiter.create(rateLimit);
+    long count;
+    int n;
+
+    if (rateLimit == SYNC_DISK_WRITE_WITH_UPSTREAM_RATE) {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+        outputStream.write(buffer, 0, n);
+        fd.sync(); // flush the buffer timely to the disk so that the disk bandwidth wouldn't get saturated
+      }
+    } else {
+      for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {

Review Comment:
   Also, put the logging message here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org