You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/10/02 04:51:02 UTC
[pinot] branch master updated: 1. Catch timeout exception to wait
segments become online for endReplaceSegments(),
so it can be retried. (#7509)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 30f5490 1. Catch timeout exception to wait segments become online for endReplaceSegments(), so it can be retried. (#7509)
30f5490 is described below
commit 30f5490968f1c34f00f431f55a7c10a9cfe5cd9c
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Fri Oct 1 21:50:41 2021 -0700
1. Catch timeout exception to wait segments become online for endReplaceSegments(), so it can be retried. (#7509)
2. Make minion socket timeout for endReplaceSegments() configurable per task.
---
.../org/apache/pinot/common/utils/FileUploadDownloadClient.java | 5 +++--
.../pinot/controller/helix/core/PinotHelixResourceManager.java | 9 ++++++++-
.../pinot/controller/helix/core/retention/RetentionManager.java | 4 +++-
.../main/java/org/apache/pinot/core/common/MinionConstants.java | 1 +
.../minion/tasks/BaseMultipleSegmentsConversionExecutor.java | 7 ++++++-
.../apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java | 5 +++--
.../minion/tasks/mergerollup/MergeRollupTaskGenerator.java | 3 +++
7 files changed, 27 insertions(+), 7 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index bcd71ae..87ad529 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -925,13 +925,14 @@ public class FileUploadDownloadClient implements Closeable {
* End replace segments with default settings.
*
* @param uri URI
+ * @oaram socketTimeoutMs Socket timeout in milliseconds
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
- public SimpleHttpResponse endReplaceSegments(URI uri)
+ public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs)
throws IOException, HttpErrorStatusException {
- return sendRequest(getEndReplaceSegmentsRequest(uri, DEFAULT_SOCKET_TIMEOUT_MS));
+ return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs));
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 25797c3..8677798 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2784,7 +2784,14 @@ public class PinotHelixResourceManager {
+ "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable));
// Check that all the segments from 'segmentsTo' become ONLINE in the external view
- waitForSegmentsBecomeOnline(tableNameWithType, new HashSet<>(lineageEntry.getSegmentsTo()));
+ try {
+ waitForSegmentsBecomeOnline(tableNameWithType, new HashSet<>(lineageEntry.getSegmentsTo()));
+ } catch (TimeoutException e) {
+ LOGGER.warn(String
+ .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)",
+ tableNameWithType, lineageEntry.getSegmentsTo()), e);
+ return false;
+ }
// Update lineage entry
LineageEntry newLineageEntry =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index f605e4c..67d2d47 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.lineage.LineageEntry;
@@ -251,7 +252,8 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion)) {
// Delete segments based on the segment lineage
_pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);
- LOGGER.info("Finished cleaning up segment lineage for table: {} in {}ms", tableNameWithType,
+ LOGGER.info("Finished cleaning up segment lineage for table: {}, deleted segments: {} in {}ms",
+ tableNameWithType, StringUtils.join(segmentsToDelete, ","),
(System.currentTimeMillis() - cleanupStartTime));
return true;
} else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 06ff9d5..36341b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -49,6 +49,7 @@ public class MinionConstants {
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
+ public static final String END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY = "endReplaceSegmentsSocketTimeoutMs";
public static class ConvertToRawIndexTask {
public static final String TASK_TYPE = "ConvertToRawIndexTask";
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 0ef9de4..fd65337 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -206,7 +206,12 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
// Update the segment lineage to indicate that the segment replacement is done.
if (replaceSegmentsEnabled) {
- SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId);
+ int endReplaceSegmentsSocketTimeoutMs =
+ configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY) != null
+ ? Integer.parseInt(configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY))
+ : FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS;
+ SegmentConversionUtils
+ .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId, endReplaceSegmentsSocketTimeoutMs);
}
String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 22f6386..53efe14 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -116,7 +116,8 @@ public class SegmentConversionUtils {
}
}
- public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId)
+ public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId,
+ int socketTimeoutMs)
throws Exception {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -124,7 +125,7 @@ public class SegmentConversionUtils {
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
URI uri = FileUploadDownloadClient
.getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId);
- SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri);
+ SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs);
LOGGER.info("Got response {}: {} while uploading table: {}, uploadURL: {}", response.getStatusCode(),
response.getResponse(), tableNameWithType, uploadURL);
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 252505b..031585f 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -90,6 +90,7 @@ import org.slf4j.LoggerFactory;
*/
@TaskGenerator
public class MergeRollupTaskGenerator implements PinotTaskGenerator {
+ public static final int END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 30 * 60 * 1000; // 30 mins
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
private static final String REFRESH = "REFRESH";
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
@@ -458,6 +459,8 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" + System.currentTimeMillis() + "_" + i + "_"
+ TableNameBuilder.extractRawTableName(offlineTableName));
+ configs.put(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY,
+ String.valueOf(END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS));
pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org