You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/08/07 10:51:29 UTC

[GitHub] [incubator-pinot] snleee opened a new pull request #5828: Improving retention manager to handle segment lineage clean-up

snleee opened a new pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828


   1. Added the logic to handle segment lineage clean-up in the
      retention manager.
   2. Added the unit test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r468077683



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,63 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // 1. The original segments can be deleted once the merged segments are successfully uploaded
+        // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in
+        //    the middle
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+        List<String> segmentsToDelete = new ArrayList<>();
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+            segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());
+
+            // The lineage entry with 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // If the lineage state is 'IN_PROGRESS', we need to clean up the zombie lineage entry and its segments
+            if (lineageEntry.getTimestamp() < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS) {
+              segmentsToDelete.addAll(lineageEntry.getSegmentsTo());
+
+              // The lineage entry with 'IN_PROGRESS' state can only be safely removed when segmentTo are all removed
+              // from the table. Deleting lineage will allow the task scheduler to re-schedule the source segments to
+              // be merged again.
+              if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+                segmentLineage.deleteLineageEntry(lineageEntryId);
+              }
+            }
+          }
+        }
+
+        // Delete segments based on the segment lineage
+        _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);

Review comment:
       We might want to delete segments after successfully writing back the lineage, or the segment might be deleted without updating the lineage

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,63 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // 1. The original segments can be deleted once the merged segments are successfully uploaded
+        // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in
+        //    the middle
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+        List<String> segmentsToDelete = new ArrayList<>();
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+            segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());
+
+            // The lineage entry with 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }

Review comment:
       We can simplify the logic as following, same for `IN_PROGRESS`
   ```suggestion
               Set<String> sourceSegments = new HashSet<>(lineageEntry.getSegmentsFrom());
               sourceSegments.retainAll(segmentsForTable);
               if (sourceSegments.isEmpty()) {
                 segmentLineage.deleteLineageEntry(lineageEntryId);
               } else {
                 segmentsToDelete.addAll(sourceSegments);
               }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467352941



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());

Review comment:
       Good catch. I was writing a testing code right before I wrote this part :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r468200530



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,63 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // 1. The original segments can be deleted once the merged segments are successfully uploaded
+        // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in
+        //    the middle
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+        List<String> segmentsToDelete = new ArrayList<>();
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+            segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());
+
+            // The lineage entry with 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }

Review comment:
       Changed. Please double check the logic for `IN_PROGRESS` as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467732477



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       @Jackie-Jiang For checking whether the segment is deleted, do you think checking `getSegmentsFor(tableName)` is good enough? Or checking idealstate is preferred?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r468200339



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,63 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // 1. The original segments can be deleted once the merged segments are successfully uploaded
+        // 2. The zombie lineage entry & merged segments should be deleted if the segment replacement failed in
+        //    the middle
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+        List<String> segmentsToDelete = new ArrayList<>();
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+            segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());
+
+            // The lineage entry with 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // If the lineage state is 'IN_PROGRESS', we need to clean up the zombie lineage entry and its segments
+            if (lineageEntry.getTimestamp() < System.currentTimeMillis() - LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS) {
+              segmentsToDelete.addAll(lineageEntry.getSegmentsTo());
+
+              // The lineage entry with 'IN_PROGRESS' state can only be safely removed when segmentTo are all removed
+              // from the table. Deleting lineage will allow the task scheduler to re-schedule the source segments to
+              // be merged again.
+              if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+                segmentLineage.deleteLineageEntry(lineageEntryId);
+              }
+            }
+          }
+        }
+
+        // Delete segments based on the segment lineage
+        _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467732477



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       @Jackie-Jiang For checking whether the segment is deleted, do you think checking `getSegmentsFor(tableName) <- looks at segment zk metadata` is good enough? Or checking idealstate is preferred?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467353665



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       I initially thought that we can clean up segment + lineage metadata within a single cycle if we wait for segments to be deleted. However, I think that it's fine that the segments get deleted first and the lineage entry gets cleaned up in the next round. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467356447



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);

Review comment:
       fixed

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));

Review comment:
       Changed the logic to handle together. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467353665



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       I initially thought that we can clean up segment + lineage metadata within a single cycle if we wait for segments to be deleted. However, I think that it's fine that the segments get deleted first and the lineage entry gets cleaned up in the next round. I changed the logic.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));

Review comment:
       changed the logic to handle together




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467353665



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       I initially was thinking of the race condition.
   
   1. Retention manager invoke delete segments (happens with some delay in the background)
   2. Retention manager writes new lineage segment to ZK (this update happens first)
   3. broker updates the routing table before segments get deleted.
   
   In this case, it's possible that we may route the query to the original segments for the short term. How do you think on this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467174357



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {

Review comment:
       Remove the null check which is redundant

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);

Review comment:
       Don't log this as this can flood the log for large cluster

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));

Review comment:
       This won't give you the up-to-date segment list because we delay the deletion of segment ZK metadata

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);

Review comment:
       This part is wrong. This will remove the real in-progress lineage

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());

Review comment:
       Don't use `Assert`, use `Preconditions` instead

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       Can we directly remove the lineage entry here? Why do we need to wait for all merged segments also being removed?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));

Review comment:
       The lineage deletion can be handled along with the segment deletion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r468200651



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467352877



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467356382



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       Actually, if we don't send out the routing table refresh message explicitly, the routing table rebuild will only get triggered after the segment gets deleted. So, it will always be the following sequence:
   
   1. Retention manager deletes segments, update segment lineage metadata (one of 2 operation can happen earlier).
   2. Broker side routing table updates will only happen after the segment is deleted (by EV change).
   
   As long as we have the guarantee that the routing table gets updated after the segment is deleted, we are good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467503210



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       Good point. In this case, we should always delete the lineage entry only if the segments are already deleted (for both `COMPLETED` and `IN_PROGRESS`).
   The logic should be:
   If any of the segments (`segmentsFrom` for `COMPLETED` and `segmentsTo` for `IN_PROGRESS`) still exists, delete the existing segments; if they are all already deleted, delete the lineage entry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee merged pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
snleee merged pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5828: Improving retention manager to handle segment lineage clean-up

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r468066337



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String segmentName, IdealState
       return states.size() == 1 && states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            .getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) {
+            // Zombie lineage entry is safe to remove. This will allow the task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. (tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie segments.
+   */
+  private List<String> computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       @snleee Checking ZK metadata is fine. That can prevent leaving orphan segment ZK metadata




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org