You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/07/28 21:46:04 UTC

[incubator-pinot] 01/01: Add retry logic to download segment tar file in pinot server

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-retry-to-download-segment
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f6ce373d90fa5b62550e53b512d78017dc16247a
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Jul 28 14:45:39 2020 -0700

    Add retry logic to download segment tar file in pinot server
---
 .../starter/helix/SegmentFetcherAndLoader.java     | 67 ++++++++++++----------
 1 file changed, 38 insertions(+), 29 deletions(-)

diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
index 85256af..2444355 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -184,39 +185,47 @@ public class SegmentFetcherAndLoader {
 
   private String downloadSegmentToLocal(String uri, PinotCrypter crypter, String tableName, String segmentName)
       throws Exception {
-    File tempDir = new File(new File(_instanceDataManager.getSegmentFileDirectory(), tableName),
-        "tmp-" + segmentName + "-" + UUID.randomUUID());
-    FileUtils.forceMkdir(tempDir);
-    File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX);
-    File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX);
-    File tempSegmentDir = new File(tempDir, segmentName);
-    try {
-      SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile);
-      if (crypter != null) {
-        crypter.decrypt(tempDownloadFile, tempTarFile);
-      } else {
-        tempTarFile = tempDownloadFile;
-      }
+    // Even if the tar file has been downloaded successfully, the file itself could be corrupted during the transmission。
+    // Thus, we should re-download it again.
+    RetryPolicies.fixedDelayRetryPolicy(5, 5_000L).attempt(() -> {
+      File tempDir = new File(new File(_instanceDataManager.getSegmentFileDirectory(), tableName),
+          "tmp-" + segmentName + "-" + UUID.randomUUID());
+      FileUtils.forceMkdir(tempDir);
+      File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX);
+      File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX);
+      File tempSegmentDir = new File(tempDir, segmentName);
+      try {
+        SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile);
+        if (crypter != null) {
+          crypter.decrypt(tempDownloadFile, tempTarFile);
+        } else {
+          tempTarFile = tempDownloadFile;
+        }
 
-      LOGGER
-          .info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, tableName,
-              uri, tempTarFile, tempTarFile.length());
+        LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName,
+            tableName, uri, tempTarFile, tempTarFile.length());
 
-      // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry.
-      // Thus, there's no need to retry again.
-      File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0);
+        // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry.
+        // Thus, there's no need to retry again.
+        File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0);
 
-      File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
-      if (indexDir.exists()) {
-        LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName);
-        FileUtils.deleteDirectory(indexDir);
+        File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
+        if (indexDir.exists()) {
+          LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName);
+          FileUtils.deleteDirectory(indexDir);
+        }
+        FileUtils.moveDirectory(tempIndexDir, indexDir);
+        LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir);
+        return Boolean.TRUE;
+      } catch (Exception e) {
+        LOGGER.error("Caught exception when downloading segment to local", e);
+        return Boolean.FALSE;
+      } finally {
+        FileUtils.deleteQuietly(tempDir);
       }
-      FileUtils.moveDirectory(tempIndexDir, indexDir);
-      LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir);
-      return indexDir.getAbsolutePath();
-    } finally {
-      FileUtils.deleteQuietly(tempDir);
-    }
+    });
+
+    return new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName).getAbsolutePath();
   }
 
   public String getSegmentLocalDirectory(String tableName, String segmentId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org