You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/07/28 21:46:04 UTC
[incubator-pinot] 01/01: Add retry logic to download segment tar
file in pinot server
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch add-retry-to-download-segment
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f6ce373d90fa5b62550e53b512d78017dc16247a
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Jul 28 14:45:39 2020 -0700
Add retry logic to download segment tar file in pinot server
---
.../starter/helix/SegmentFetcherAndLoader.java | 67 ++++++++++++----------
1 file changed, 38 insertions(+), 29 deletions(-)
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
index 85256af..2444355 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.crypt.PinotCrypter;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -184,39 +185,47 @@ public class SegmentFetcherAndLoader {
private String downloadSegmentToLocal(String uri, PinotCrypter crypter, String tableName, String segmentName)
throws Exception {
- File tempDir = new File(new File(_instanceDataManager.getSegmentFileDirectory(), tableName),
- "tmp-" + segmentName + "-" + UUID.randomUUID());
- FileUtils.forceMkdir(tempDir);
- File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX);
- File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX);
- File tempSegmentDir = new File(tempDir, segmentName);
- try {
- SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile);
- if (crypter != null) {
- crypter.decrypt(tempDownloadFile, tempTarFile);
- } else {
- tempTarFile = tempDownloadFile;
- }
+ // Even if the tar file has been downloaded successfully, the file itself could be corrupted during the transmission。
+ // Thus, we should re-download it again.
+ RetryPolicies.fixedDelayRetryPolicy(5, 5_000L).attempt(() -> {
+ File tempDir = new File(new File(_instanceDataManager.getSegmentFileDirectory(), tableName),
+ "tmp-" + segmentName + "-" + UUID.randomUUID());
+ FileUtils.forceMkdir(tempDir);
+ File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX);
+ File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX);
+ File tempSegmentDir = new File(tempDir, segmentName);
+ try {
+ SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile);
+ if (crypter != null) {
+ crypter.decrypt(tempDownloadFile, tempTarFile);
+ } else {
+ tempTarFile = tempDownloadFile;
+ }
- LOGGER
- .info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, tableName,
- uri, tempTarFile, tempTarFile.length());
+ LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName,
+ tableName, uri, tempTarFile, tempTarFile.length());
- // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry.
- // Thus, there's no need to retry again.
- File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0);
+ // If an exception is thrown when untarring, it means the tar file is broken OR not found after the retry.
+ // Thus, there's no need to retry again.
+ File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile, tempSegmentDir).get(0);
- File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
- if (indexDir.exists()) {
- LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName);
- FileUtils.deleteDirectory(indexDir);
+ File indexDir = new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
+ if (indexDir.exists()) {
+ LOGGER.info("Deleting existing index directory for segment: {} for table: {}", segmentName, tableName);
+ FileUtils.deleteDirectory(indexDir);
+ }
+ FileUtils.moveDirectory(tempIndexDir, indexDir);
+ LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir);
+ return Boolean.TRUE;
+ } catch (Exception e) {
+ LOGGER.error("Caught exception when downloading segment to local", e);
+ return Boolean.FALSE;
+ } finally {
+ FileUtils.deleteQuietly(tempDir);
}
- FileUtils.moveDirectory(tempIndexDir, indexDir);
- LOGGER.info("Successfully downloaded segment: {} for table: {} to: {}", segmentName, tableName, indexDir);
- return indexDir.getAbsolutePath();
- } finally {
- FileUtils.deleteQuietly(tempDir);
- }
+ });
+
+ return new File(new File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName).getAbsolutePath();
}
public String getSegmentLocalDirectory(String tableName, String segmentId) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org