You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/11/05 04:51:07 UTC

[pinot] branch master updated: Use ideal state as source of truth for segment existence (#9735)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d530695c57 Use ideal state as source of truth for segment existence (#9735)
d530695c57 is described below

commit d530695c5759ae042896d40c989bed1b4ea872ec
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Nov 4 21:51:01 2022 -0700

    Use ideal state as source of truth for segment existence (#9735)
---
 .../api/resources/PinotSegmentRestletResource.java |  4 +-
 .../helix/core/PinotHelixResourceManager.java      | 56 ++++++++++------------
 .../PinotHelixResourceManagerStatelessTest.java    |  3 +-
 3 files changed, 27 insertions(+), 36 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index ea77b9caec..c467c3f816 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -829,8 +829,8 @@ public class PinotSegmentRestletResource {
     }
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false),
-        retentionPeriod);
+    deleteSegmentsInternal(tableNameWithType,
+        _pinotHelixResourceManager.getSegmentsFromPropertyStore(tableNameWithType), retentionPeriod);
     return new SuccessResponse("All segments of table " + tableNameWithType + " deleted");
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3e7ed24d5d..01b778a5f4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -730,22 +730,29 @@ public class PinotHelixResourceManager {
    */
 
   /**
-   * Returns the segments for the given table.
+   * Returns the segments for the given table from the ideal state.
    *
    * @param tableNameWithType Table name with type suffix
    * @param shouldExcludeReplacedSegments whether to return the list of segments that doesn't contain replaced segments.
    * @return List of segment names
    */
   public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExcludeReplacedSegments) {
-    List<String> segmentsFromPropertiesStore = ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
-    if (shouldExcludeReplacedSegments) {
-      return excludeReplacedSegments(tableNameWithType, segmentsFromPropertiesStore);
-    }
-    return segmentsFromPropertiesStore;
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
+    List<String> segments = new ArrayList<>(idealState.getPartitionSet());
+    return shouldExcludeReplacedSegments ? excludeReplacedSegments(tableNameWithType, segments) : segments;
   }
 
   /**
-   * Returns the segments for the given table based on the start and end timestamp.
+   * Returns the segments for the given table from the property store. This API is useful to track the orphan segments
+   * that are removed from the ideal state but not the property store.
+   */
+  public List<String> getSegmentsFromPropertyStore(String tableNameWithType) {
+    return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
+  }
+
+  /**
+   * Returns the segments for the given table based on the start and end timestamp from the ideal state.
    *
    * @param tableNameWithType  Table name with type suffix
    * @param startTimestamp  start timestamp in milliseconds (inclusive)
@@ -754,21 +761,24 @@ public class PinotHelixResourceManager {
    */
   public List<String> getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp,
       long endTimestamp, boolean excludeOverlapping) {
-    List<String> selectedSegments;
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
+    Set<String> segments = idealState.getPartitionSet();
     // If no start and end timestamp specified, just select all the segments.
     if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
-      selectedSegments = getSegmentsFor(tableNameWithType, false);
+      return excludeReplacedSegments(tableNameWithType, new ArrayList<>(segments));
     } else {
-      selectedSegments = new ArrayList<>();
+      List<String> selectedSegments = new ArrayList<>();
       List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
       for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
         String segmentName = segmentZKMetadata.getSegmentName();
-        if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
+        if (segments.contains(segmentName) && isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp,
+            excludeOverlapping)) {
           selectedSegments.add(segmentName);
         }
       }
+      return excludeReplacedSegments(tableNameWithType, selectedSegments);
     }
-    return excludeReplacedSegments(tableNameWithType, selectedSegments);
   }
 
   /**
@@ -1890,7 +1900,7 @@ public class PinotHelixResourceManager {
 
     // Remove all stored segments for the table
     Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
-    _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFor(offlineTableName, false),
+    _segmentDeletionManager.removeSegmentsFromStore(offlineTableName, getSegmentsFromPropertyStore(offlineTableName),
         retentionPeriodMs);
     LOGGER.info("Deleting table {}: Removed stored segments", offlineTableName);
 
@@ -1947,7 +1957,7 @@ public class PinotHelixResourceManager {
 
     // Remove all stored segments for the table
     Long retentionPeriodMs = retentionPeriod != null ? TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
-    _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFor(realtimeTableName, false),
+    _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName, getSegmentsFromPropertyStore(realtimeTableName),
         retentionPeriodMs);
     LOGGER.info("Deleting table {}: Removed stored segments", realtimeTableName);
 
@@ -3334,7 +3344,6 @@ public class PinotHelixResourceManager {
           if (!segmentsToCleanUp.isEmpty()) {
             LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp);
             deleteSegments(tableNameWithType, segmentsToCleanUp);
-            waitForSegmentsToDelete(tableNameWithType, segmentsToCleanUp, SEGMENT_CLEANUP_TIMEOUT_MS);
           }
           return true;
         } else {
@@ -3355,23 +3364,6 @@ public class PinotHelixResourceManager {
     return segmentLineageEntryId;
   }
 
-  private void waitForSegmentsToDelete(String tableNameWithType, List<String> segments, long timeOutInMillis)
-      throws InterruptedException {
-    LOGGER.info("Waiting for {} segments to delete for table: {}. timeout = {}ms, segments = {}", segments.size(),
-        tableNameWithType, timeOutInMillis, segments);
-    long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
-    do {
-      if (Collections.disjoint(getSegmentsFor(tableNameWithType, false), segments)) {
-        return;
-      } else {
-        Thread.sleep(SEGMENT_CLEANUP_CHECK_INTERVAL_MS);
-      }
-    } while (System.currentTimeMillis() < endTimeMs);
-    throw new RuntimeException(
-        "Timeout while waiting for segments to be deleted for table: " + tableNameWithType + ", timeout: "
-            + timeOutInMillis + "ms");
-  }
-
   /**
    * Computes the end segment replace phase
    *
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 5e9def8a56..6569a6a4be 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -1053,8 +1053,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest {
     // Call revert segment replacements (s3, s4, s5) <- (s9, s10, s11) to check if the revertReplaceSegments correctly
     // deleted (s9, s10, s11).
     _helixResourceManager.revertReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId3, false);
-    TestUtils.waitForCondition(aVoid -> _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size() == 3,
-        60_000L, "Failed to delete the segments");
+    assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 3);
     assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true), "s3", "s4", "s5");
 
     // Re-upload (s9, s10, s11) to test the segment clean up from startReplaceSegments


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org