You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/06/10 00:13:11 UTC

[GitHub] [pinot] jackjlli commented on a diff in pull request #8753: Streamed segment download & untar with rate limiter to control disk usage

jackjlli commented on code in PR #8753:
URL: https://github.com/apache/pinot/pull/8753#discussion_r894043108


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java:
##########
@@ -198,4 +234,30 @@ public static void untarOneFile(File inputFile, String fileName, File outputFile
       throw new IOException(String.format("Failed to find file: %s in: %s", fileName, inputFile));
     }
   }
+
+  public static long copyWithRateLimiter(InputStream inputStream, FileOutputStream outputStream,
+      long maxStreamRateInByte)
+      throws IOException {
+    Preconditions.checkState(inputStream != null, "inputStream is null");

Review Comment:
   `Preconditions.checkNotNull()` might be better. Same for the following line.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -409,11 +422,35 @@ private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata
       throws Exception {
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
     FileUtils.forceMkdir(tempRootDir);
+    if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {

Review Comment:
   Does that mean we cannot stream & untar if the segment is encrypted? If so, specify it to one comment in this method here.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -409,11 +422,35 @@ private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata
       throws Exception {
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
     FileUtils.forceMkdir(tempRootDir);
+    if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {

Review Comment:
   Also, can this if statement be put into the try block, so that you don't have to duplicate the code for deleting tempRootDir.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java:
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.data.manager;
+
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+
+
+public class TableDataManagerParams {
+  private boolean _isStreamSegmentDownloadUntar; // whether to turn on stream segment download-untar
+  private long _streamSegmentDownloadUntarRateLimitBytesPerSec; // the per segment rate limit for stream download-untar
+  private int _maxParallelSegmentDownloads; // max number of segment download in parallel per table
+
+  public TableDataManagerParams(int maxParallelSegmentDownloads, boolean isStreamSegmentDownloadUntar,
+      long streamSegmentDownloadUntarRateLimitBytesPerSec) {
+    _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
+    _isStreamSegmentDownloadUntar = isStreamSegmentDownloadUntar;
+    _streamSegmentDownloadUntarRateLimitBytesPerSec = streamSegmentDownloadUntarRateLimitBytesPerSec;
+  }
+
+  public TableDataManagerParams(InstanceDataManagerConfig instanceDataManagerConfig) {
+    _maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads();
+    _isStreamSegmentDownloadUntar = instanceDataManagerConfig.isStreamSegmentDownloadUntar();
+    _streamSegmentDownloadUntarRateLimitBytesPerSec =
+        instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
+  }
+
+  public boolean isStreamSegmentDownloadUntar() {
+    return _isStreamSegmentDownloadUntar;
+  }
+
+  public long getStreamSegmentDownloadUntarRateLimitBytesPerSec() {
+    return _streamSegmentDownloadUntarRateLimitBytesPerSec;
+  }
+
+  public void setStreamSegmentDownloadUntar(boolean streamSegmentDownloadUntar) {

Review Comment:
   You might not need all the setter methods here.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java:
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.data.manager;
+
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+
+
+public class TableDataManagerParams {
+  private boolean _isStreamSegmentDownloadUntar; // whether to turn on stream segment download-untar
+  private long _streamSegmentDownloadUntarRateLimitBytesPerSec; // the per segment rate limit for stream download-untar
+  private int _maxParallelSegmentDownloads; // max number of segment download in parallel per table
+
+  public TableDataManagerParams(int maxParallelSegmentDownloads, boolean isStreamSegmentDownloadUntar,
+      long streamSegmentDownloadUntarRateLimitBytesPerSec) {
+    _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
+    _isStreamSegmentDownloadUntar = isStreamSegmentDownloadUntar;
+    _streamSegmentDownloadUntarRateLimitBytesPerSec = streamSegmentDownloadUntarRateLimitBytesPerSec;
+  }
+
+  public TableDataManagerParams(InstanceDataManagerConfig instanceDataManagerConfig) {
+    _maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads();

Review Comment:
   Change it to sth like:
   `this(instanceDataManagerConfig.getMaxParallelSegmentDownloads(), instanceDataManagerConfig.isStreamSegmentDownloadUntar(), instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());`



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java:
##########
@@ -90,6 +92,58 @@ public void fetchSegmentToLocal(URI downloadURI, File dest)
     });
   }
 
+  @Override
+  public File fetchUntarSegmentToLocalStreamed(URI downloadURI, File dest, long maxStreamRateInByte)
+      throws Exception {
+    // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to

Review Comment:
   I think most of the logic here is the same as the one in `fetchSegmentToLocal()` method.  It'd be good to see if we can extract the same logic to avoid duplicating the code.



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java:
##########
@@ -102,6 +102,11 @@ public void fetchSegmentToLocal(List<URI> uris, File dest)
     });
   }
 
+  public File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long rateLimit)

Review Comment:
   Rename the last param here. Same for other method.
   Also, put a `@Override` annotation here.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -409,11 +422,35 @@ private File downloadSegmentFromDeepStore(String segmentName, SegmentZKMetadata
       throws Exception {
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
     FileUtils.forceMkdir(tempRootDir);
+    if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
+      try {
+        File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentName, zkMetadata, tempRootDir,
+            _streamSegmentDownloadUntarRateLimitBytesPerSec);
+        return moveSegment(segmentName, untaredSegDir);
+      } finally {
+        FileUtils.deleteQuietly(tempRootDir);
+      }
+    } else {
+      try {
+        File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
+        return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
+      } finally {
+        FileUtils.deleteQuietly(tempRootDir);
+      }
+    }
+  }
+
+  private File moveSegment(String segmentName, File untaredSegDir)
+      throws IOException {
     try {
-      File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
-      return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
-    } finally {
-      FileUtils.deleteQuietly(tempRootDir);
+      File indexDir = getSegmentDataDir(segmentName);
+      FileUtils.deleteDirectory(indexDir);
+      FileUtils.moveDirectory(untaredSegDir, indexDir);
+      return indexDir;
+    } catch (Exception e) {
+      LOGGER.error("Failed to move segment: {} of table: {}", segmentName, _tableNameWithType);

Review Comment:
   It'd be good to print out what exact exception is thrown in this error message, if it doesn't get printed somewhere in the upper caller above.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java:
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.data.manager;
+
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+
+
+public class TableDataManagerParams {
+  private boolean _isStreamSegmentDownloadUntar; // whether to turn on stream segment download-untar
+  private long _streamSegmentDownloadUntarRateLimitBytesPerSec; // the per segment rate limit for stream download-untar
+  private int _maxParallelSegmentDownloads; // max number of segment download in parallel per table
+
+  public TableDataManagerParams(int maxParallelSegmentDownloads, boolean isStreamSegmentDownloadUntar,
+      long streamSegmentDownloadUntarRateLimitBytesPerSec) {
+    _maxParallelSegmentDownloads = maxParallelSegmentDownloads;
+    _isStreamSegmentDownloadUntar = isStreamSegmentDownloadUntar;
+    _streamSegmentDownloadUntarRateLimitBytesPerSec = streamSegmentDownloadUntarRateLimitBytesPerSec;
+  }
+
+  public TableDataManagerParams(InstanceDataManagerConfig instanceDataManagerConfig) {
+    _maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads();

Review Comment:
   Also, rename the variable `streamSegmentDownloadUntarRateLimit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org