You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/09/13 22:32:27 UTC

[pinot] branch master updated: Fix retention for cleaning up segment lineage (#7424)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe03999  Fix retention for cleaning up segment lineage (#7424)
fe03999 is described below

commit fe03999a9c59aa1a97eeb5cf070858c6e10f23b6
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Mon Sep 13 15:32:10 2021 -0700

    Fix retention for cleaning up segment lineage (#7424)
    
    Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
 .../helix/core/retention/RetentionManager.java        | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index b9b0d09..f605e4c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -79,8 +79,13 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
 
   @Override
   protected void processTable(String tableNameWithType) {
-    LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+    // Manage normal table retention except segment lineage cleanup.
+    // The reason of separating the logic is that REFRESH only table will be skipped in the first part,
+    // whereas the segment lineage cleanup needs to be handled.
     manageRetentionForTable(tableNameWithType);
+
+    // Delete segments based on segment lineage and clean up segment lineage metadata.
+    manageSegmentLineageCleanupForTable(tableNameWithType);
   }
 
   @Override
@@ -90,6 +95,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
   }
 
   private void manageRetentionForTable(String tableNameWithType) {
+    LOGGER.info("Start managing retention for table: {}", tableNameWithType);
 
     // Build retention strategy from table config
     TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
@@ -102,7 +108,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
     SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
     String segmentPushType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
     if (tableConfig.getTableType() == TableType.OFFLINE && !"APPEND".equalsIgnoreCase(segmentPushType)) {
-      LOGGER.info("Segment push type is not APPEND for table: {}, skip", tableNameWithType);
+      LOGGER.info("Segment push type is not APPEND for table: {}, skip managing retention", tableNameWithType);
       return;
     }
     String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
@@ -123,9 +129,6 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
     } else {
       manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
     }
-
-    // Delete segments based on segment lineage and clean up segment lineage metadata
-    manageSegmentLineageCleanupForTable(tableNameWithType);
   }
 
   private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy) {
@@ -202,6 +205,8 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
         if (segmentLineageZNRecord == null) {
           return true;
         }
+        LOGGER.info("Start cleaning up segment lineage for table: {}", tableNameWithType);
+        long cleanupStartTime = System.currentTimeMillis();
         SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
         int expectedVersion = segmentLineageZNRecord.getVersion();
 
@@ -246,8 +251,12 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
             .writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion)) {
           // Delete segments based on the segment lineage
           _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);
+          LOGGER.info("Finished cleaning up segment lineage for table: {} in {}ms", tableNameWithType,
+              (System.currentTimeMillis() - cleanupStartTime));
           return true;
         } else {
+          LOGGER.warn("Failed to write segment lineage back when cleaning up segment lineage for table: {}",
+              tableNameWithType);
           return false;
         }
       });

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org