You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/05/19 01:13:51 UTC

[pinot] branch master updated: Keep last completed segment for retention (#10754)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8293facfa0 Keep last completed segment for retention (#10754)
8293facfa0 is described below

commit 8293facfa065ae7b317403a1bbeb969881a84ec1
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Thu May 18 18:13:43 2023 -0700

    Keep last completed segment for retention (#10754)
---
 .../helix/core/PinotHelixResourceManager.java      | 21 ++++++
 .../helix/core/retention/RetentionManager.java     |  8 +++
 .../helix/core/retention/RetentionManagerTest.java | 77 ++++++++++++++++++++++
 3 files changed, 106 insertions(+)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 9142b93792..111ea6a24b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -34,6 +34,7 @@ import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -118,6 +119,8 @@ import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.tier.TierSegmentSelector;
 import org.apache.pinot.common.utils.BcryptUtils;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
@@ -872,6 +875,24 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
   }
 
+  public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) {
+    Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<>();
+    for (SegmentZKMetadata segMetadata : getSegmentsZKMetadata(tableNameWithType)) {
+      if (SegmentName.isLowLevelConsumerSegmentName(segMetadata.getSegmentName())
+          && segMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
+        LLCSegmentName llcName = LLCSegmentName.of(segMetadata.getSegmentName());
+        int partitionGroupId = llcName.getPartitionGroupId();
+        int sequenceNumber = llcName.getSequenceNumber();
+        String lastCompletedSegName = partitionIdToLastLLCCompletedSegmentMap.get(partitionGroupId);
+        if (lastCompletedSegName == null
+            || LLCSegmentName.of(lastCompletedSegName).getSequenceNumber() < sequenceNumber) {
+          partitionIdToLastLLCCompletedSegmentMap.put(partitionGroupId, segMetadata.getSegmentName());
+        }
+      }
+    }
+    return partitionIdToLastLLCCompletedSegmentMap.values();
+  }
+
   public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
     return deleteSegments(tableNameWithType, segmentNames, null);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 36443146e0..93c7d8573b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -159,6 +159,10 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
         }
       }
     }
+
+    // Remove last sealed segments such that the table can still create new consuming segments if it's paused
+    segmentsToDelete.removeAll(_pinotHelixResourceManager.getLastLLCCompletedSegments(realtimeTableName));
+
     if (!segmentsToDelete.isEmpty()) {
       LOGGER.info("Deleting {} segments from table: {}", segmentsToDelete.size(), realtimeTableName);
       _pinotHelixResourceManager.deleteSegments(realtimeTableName, segmentsToDelete);
@@ -214,6 +218,10 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
         // Write back to the lineage entry
         if (SegmentLineageAccessHelper.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
             segmentLineage, expectedVersion)) {
+          // Remove last sealed segments such that the table can still create new consuming segments if it's paused
+          if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+            segmentsToDelete.removeAll(_pinotHelixResourceManager.getLastLLCCompletedSegments(tableNameWithType));
+          }
           // Delete segments based on the segment lineage
           if (!segmentsToDelete.isEmpty()) {
             _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 6da50f5c5b..2f16f830b9 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -233,6 +233,39 @@ public class RetentionManagerTest {
     verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
   }
 
+  // This test makes sure that we do not clean up last llc completed segments
+  @Test
+  public void testRealtimeLastLLCCleanup()
+      throws Exception {
+    final long now = System.currentTimeMillis();
+    final int replicaCount = 1;
+
+    TableConfig tableConfig = createRealtimeTableConfig1(replicaCount);
+    List<String> removedSegments = new ArrayList<>();
+    LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    PinotHelixResourceManager pinotHelixResourceManager =
+        setupSegmentMetadataForPausedTable(tableConfig, now, removedSegments);
+    setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager, leadControllerManager);
+
+    ControllerConf conf = new ControllerConf();
+    ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager =
+        new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
+    retentionManager.start();
+    retentionManager.run();
+
+    SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager();
+
+    // Verify that the removeAgedDeletedSegments() method in deletion manager is actually called.
+    verify(deletionManager, times(1)).removeAgedDeletedSegments(leadControllerManager);
+
+    // Verify that the deleteSegments method is actually called.
+    verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
+  }
+
   private PinotHelixResourceManager setupSegmentMetadata(TableConfig tableConfig, final long now, final int nSegments,
       List<String> segmentsToBeDeleted) {
     final int replicaCount = tableConfig.getReplication();
@@ -297,6 +330,50 @@ public class RetentionManagerTest {
     return pinotHelixResourceManager;
   }
 
+  private PinotHelixResourceManager setupSegmentMetadataForPausedTable(TableConfig tableConfig, final long now,
+      List<String> segmentsToBeDeleted) {
+    final int replicaCount = tableConfig.getReplication();
+
+    List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
+
+    IdealState idealState =
+        PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true);
+
+    final int kafkaPartition = 5;
+    final long millisInDays = TimeUnit.DAYS.toMillis(1);
+    final String serverName = "Server_localhost_0";
+    LLCSegmentName llcSegmentName0 = new LLCSegmentName(TEST_TABLE_NAME, kafkaPartition, 0, now);
+    SegmentZKMetadata segmentZKMetadata0 = createSegmentZKMetadata(llcSegmentName0.getSegmentName(), replicaCount, now);
+    segmentZKMetadata0.setTimeUnit(TimeUnit.MILLISECONDS);
+    segmentZKMetadata0.setStartTime(now - 30 * millisInDays);
+    segmentZKMetadata0.setEndTime(now - 20 * millisInDays);
+    segmentZKMetadata0.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    segmentsZKMetadata.add(segmentZKMetadata0);
+    idealState.setPartitionState(llcSegmentName0.getSegmentName(), serverName, "ONLINE");
+    segmentsToBeDeleted.add(llcSegmentName0.getSegmentName());
+
+    LLCSegmentName llcSegmentName1 = new LLCSegmentName(TEST_TABLE_NAME, kafkaPartition, 1, now);
+    SegmentZKMetadata segmentZKMetadata1 = createSegmentZKMetadata(llcSegmentName1.getSegmentName(), replicaCount, now);
+    segmentZKMetadata1.setTimeUnit(TimeUnit.MILLISECONDS);
+    segmentZKMetadata1.setStartTime(now - 20 * millisInDays);
+    segmentZKMetadata1.setEndTime(now - 10 * millisInDays);
+    segmentZKMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    segmentsZKMetadata.add(segmentZKMetadata1);
+    idealState.setPartitionState(llcSegmentName1.getSegmentName(), serverName, "ONLINE");
+
+    PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+    when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
+    when(pinotHelixResourceManager.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segmentsZKMetadata);
+    when(pinotHelixResourceManager.getHelixClusterName()).thenReturn(HELIX_CLUSTER_NAME);
+    when(pinotHelixResourceManager.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod();
+
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
+    when(helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(idealState);
+    when(pinotHelixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
+
+    return pinotHelixResourceManager;
+  }
+
   private SegmentZKMetadata createSegmentZKMetadata(String segmentName, int replicaCount, long segmentCreationTime) {
     SegmentZKMetadata segmentMetadata = new SegmentZKMetadata(segmentName);
     segmentMetadata.setCreationTime(segmentCreationTime);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org