You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/10/02 04:51:02 UTC

[pinot] branch master updated: 1. Catch timeout exception to wait segments become online for endReplaceSegments(), so it can be retried. (#7509)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 30f5490  1. Catch timeout exception to wait segments become online for endReplaceSegments(), so it can be retried. (#7509)
30f5490 is described below

commit 30f5490968f1c34f00f431f55a7c10a9cfe5cd9c
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Fri Oct 1 21:50:41 2021 -0700

    1. Catch timeout exception to wait segments become online for endReplaceSegments(), so it can be retried. (#7509)
    
    2. Make minion socket timeout for endReplaceSegments() configurable per task.
---
 .../org/apache/pinot/common/utils/FileUploadDownloadClient.java  | 5 +++--
 .../pinot/controller/helix/core/PinotHelixResourceManager.java   | 9 ++++++++-
 .../pinot/controller/helix/core/retention/RetentionManager.java  | 4 +++-
 .../main/java/org/apache/pinot/core/common/MinionConstants.java  | 1 +
 .../minion/tasks/BaseMultipleSegmentsConversionExecutor.java     | 7 ++++++-
 .../apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java | 5 +++--
 .../minion/tasks/mergerollup/MergeRollupTaskGenerator.java       | 3 +++
 7 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index bcd71ae..87ad529 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -925,13 +925,14 @@ public class FileUploadDownloadClient implements Closeable {
    * End replace segments with default settings.
    *
    * @param uri URI
+   * @oaram socketTimeoutMs Socket timeout in milliseconds
    * @return Response
    * @throws IOException
    * @throws HttpErrorStatusException
    */
-  public SimpleHttpResponse endReplaceSegments(URI uri)
+  public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs)
       throws IOException, HttpErrorStatusException {
-    return sendRequest(getEndReplaceSegmentsRequest(uri, DEFAULT_SOCKET_TIMEOUT_MS));
+    return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs));
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 25797c3..8677798 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2784,7 +2784,14 @@ public class PinotHelixResourceManager {
                 + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable));
 
         // Check that all the segments from 'segmentsTo' become ONLINE in the external view
-        waitForSegmentsBecomeOnline(tableNameWithType, new HashSet<>(lineageEntry.getSegmentsTo()));
+        try {
+          waitForSegmentsBecomeOnline(tableNameWithType, new HashSet<>(lineageEntry.getSegmentsTo()));
+        } catch (TimeoutException e) {
+          LOGGER.warn(String
+              .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)",
+                  tableNameWithType, lineageEntry.getSegmentsTo()), e);
+          return false;
+        }
 
         // Update lineage entry
         LineageEntry newLineageEntry =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index f605e4c..67d2d47 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.lineage.LineageEntry;
@@ -251,7 +252,8 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
             .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion)) {
           // Delete segments based on the segment lineage
           _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);
-          LOGGER.info("Finished cleaning up segment lineage for table: {} in {}ms", tableNameWithType,
+          LOGGER.info("Finished cleaning up segment lineage for table: {}, deleted segments: {} in {}ms",
+              tableNameWithType, StringUtils.join(segmentsToDelete, ","),
               (System.currentTimeMillis() - cleanupStartTime));
           return true;
         } else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 06ff9d5..36341b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -49,6 +49,7 @@ public class MinionConstants {
 
   public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
   public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
+  public static final String END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY = "endReplaceSegmentsSocketTimeoutMs";
 
   public static class ConvertToRawIndexTask {
     public static final String TASK_TYPE = "ConvertToRawIndexTask";
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 0ef9de4..fd65337 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -206,7 +206,12 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
 
       // Update the segment lineage to indicate that the segment replacement is done.
       if (replaceSegmentsEnabled) {
-        SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId);
+        int endReplaceSegmentsSocketTimeoutMs =
+            configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY) != null
+                ? Integer.parseInt(configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY))
+                : FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS;
+        SegmentConversionUtils
+            .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId, endReplaceSegmentsSocketTimeoutMs);
       }
 
       String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 22f6386..53efe14 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -116,7 +116,8 @@ public class SegmentConversionUtils {
     }
   }
 
-  public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId)
+  public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId,
+      int socketTimeoutMs)
       throws Exception {
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -124,7 +125,7 @@ public class SegmentConversionUtils {
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
       URI uri = FileUploadDownloadClient
           .getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId);
-      SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri);
+      SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs);
       LOGGER.info("Got response {}: {} while uploading table: {}, uploadURL: {}", response.getStatusCode(),
           response.getResponse(), tableNameWithType, uploadURL);
     }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 252505b..031585f 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -90,6 +90,7 @@ import org.slf4j.LoggerFactory;
  */
 @TaskGenerator
 public class MergeRollupTaskGenerator implements PinotTaskGenerator {
+  public static final int END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 30 * 60 * 1000; // 30 mins
   private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
   private static final String REFRESH = "REFRESH";
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
@@ -458,6 +459,8 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
       configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
           MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" + System.currentTimeMillis() + "_" + i + "_"
               + TableNameBuilder.extractRawTableName(offlineTableName));
+      configs.put(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY,
+          String.valueOf(END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS));
       pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org