You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/11/05 04:51:07 UTC
[pinot] branch master updated: Use ideal state as source of truth for segment existence (#9735)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d530695c57 Use ideal state as source of truth for segment existence (#9735)
d530695c57 is described below
commit d530695c5759ae042896d40c989bed1b4ea872ec
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Nov 4 21:51:01 2022 -0700
Use ideal state as source of truth for segment existence (#9735)
---
.../api/resources/PinotSegmentRestletResource.java | 4 +-
.../helix/core/PinotHelixResourceManager.java | 56 ++++++++++------------
.../PinotHelixResourceManagerStatelessTest.java | 3 +-
3 files changed, 27 insertions(+), 36 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index ea77b9caec..c467c3f816 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -829,8 +829,8 @@ public class PinotSegmentRestletResource {
}
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
- deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false),
- retentionPeriod);
+ deleteSegmentsInternal(tableNameWithType,
+ _pinotHelixResourceManager.getSegmentsFromPropertyStore(tableNameWithType), retentionPeriod);
return new SuccessResponse("All segments of table " + tableNameWithType + " deleted");
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3e7ed24d5d..01b778a5f4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -730,22 +730,29 @@ public class PinotHelixResourceManager {
*/
/**
- * Returns the segments for the given table.
+ * Returns the segments for the given table from the ideal state.
*
* @param tableNameWithType Table name with type suffix
* @param shouldExcludeReplacedSegments whether to return the list of segments that doesn't contain replaced segments.
* @return List of segment names
*/
public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments) {
- List<String> segmentsFromPropertiesStore = ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
- if (shouldExcludeReplacedSegments) {
- return excludeReplacedSegments(tableNameWithType, segmentsFromPropertiesStore);
- }
- return segmentsFromPropertiesStore;
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
+ List<String> segments = new ArrayList<>(idealState.getPartitionSet());
+ return shouldExcludeReplacedSegments ? excludeReplacedSegments(tableNameWithType, segments) : segments;
}
/**
- * Returns the segments for the given table based on the start and end timestamp.
+ * Returns the segments for the given table from the property store. This API is useful to track the orphan segments
+ * that are removed from the ideal state but not the property store.
+ */
+ public List<String> getSegmentsFromPropertyStore(String tableNameWithType) {
+ return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
+ }
+
+ /**
+ * Returns the segments for the given table based on the start and end timestamp from the ideal state.
*
* @param tableNameWithType Table name with type suffix
* @param startTimestamp start timestamp in milliseconds (inclusive)
@@ -754,21 +761,24 @@ public class PinotHelixResourceManager {
*/
public List<String> getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp,
long endTimestamp, boolean excludeOverlapping) {
- List<String> selectedSegments;
+ IdealState idealState = getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
+ Set<String> segments = idealState.getPartitionSet();
// If no start and end timestamp specified, just select all the segments.
if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
- selectedSegments = getSegmentsFor(tableNameWithType, false);
+ return excludeReplacedSegments(tableNameWithType, new ArrayList<>(segments));
} else {
- selectedSegments = new ArrayList<>();
+ List<String> selectedSegments = new ArrayList<>();
List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
String segmentName = segmentZKMetadata.getSegmentName();
- if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
+ if (segments.contains(segmentName) && isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp,
+ excludeOverlapping)) {
selectedSegments.add(segmentName);
}
}
+ return excludeReplacedSegments(tableNameWithType, selectedSegments);
}
- return excludeReplacedSegments(tableNameWithType, selectedSegments);
}
/**
@@ -1890,7 +1900,7 @@ public class PinotHelixResourceManager {
// Remove all stored segments for the table
Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
- _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFor(offlineTableName, false),
+ _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFromPropertyStore(offlineTableName),
retentionPeriodMs);
LOGGER.info("Deleting table {}: Removed stored segments", offlineTableName);
@@ -1947,7 +1957,7 @@ public class PinotHelixResourceManager {
// Remove all stored segments for the table
Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
- _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFor(realtimeTableName, false),
+ _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFromPropertyStore(realtimeTableName),
retentionPeriodMs);
LOGGER.info("Deleting table {}: Removed stored segments", realtimeTableName);
@@ -3334,7 +3344,6 @@ public class PinotHelixResourceManager {
if (!segmentsToCleanUp.isEmpty()) {
LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp);
deleteSegments(tableNameWithType, segmentsToCleanUp);
- waitForSegmentsToDelete(tableNameWithType, segmentsToCleanUp, SEGMENT_CLEANUP_TIMEOUT_MS);
}
return true;
} else {
@@ -3355,23 +3364,6 @@ public class PinotHelixResourceManager {
return segmentLineageEntryId;
}
- private void waitForSegmentsToDelete(String tableNameWithType, List<String> segments, long timeOutInMillis)
- throws InterruptedException {
- LOGGER.info("Waiting for {} segments to delete for table: {}. timeout = {}ms, segments = {}", segments.size(),
- tableNameWithType, timeOutInMillis, segments);
- long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
- do {
- if (Collections.disjoint(getSegmentsFor(tableNameWithType, false), segments)) {
- return;
- } else {
- Thread.sleep(SEGMENT_CLEANUP_CHECK_INTERVAL_MS);
- }
- } while (System.currentTimeMillis() < endTimeMs);
- throw new RuntimeException(
- "Timeout while waiting for segments to be deleted for table: " + tableNameWithType + ", timeout: "
- + timeOutInMillis + "ms");
- }
-
/**
* Computes the end segment replace phase
*
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 5e9def8a56..6569a6a4be 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -1053,8 +1053,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
// Call revert segment replacements (s3, s4, s5) <- (s9, s10, s11) to check if the revertReplaceSegments correctly
// deleted (s9, s10, s11).
_helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false);
- TestUtils.waitForCondition(aVoid -> _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size() == 3,
- 60_000L, "Failed to delete the segments");
+ assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 3);
assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true), "s3", "s4", "s5");
// Re-upload (s9, s10, s11) to test the segment clean up from startReplaceSegments
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org