You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/05/19 01:13:51 UTC
[pinot] branch master updated: Keep last completed segment for retention (#10754)
This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8293facfa0 Keep last completed segment for retention (#10754)
8293facfa0 is described below
commit 8293facfa065ae7b317403a1bbeb969881a84ec1
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Thu May 18 18:13:43 2023 -0700
Keep last completed segment for retention (#10754)
---
.../helix/core/PinotHelixResourceManager.java | 21 ++++++
.../helix/core/retention/RetentionManager.java | 8 +++
.../helix/core/retention/RetentionManagerTest.java | 77 ++++++++++++++++++++++
3 files changed, 106 insertions(+)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 9142b93792..111ea6a24b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -34,6 +34,7 @@ import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -118,6 +119,8 @@ import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.BcryptUtils;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
@@ -872,6 +875,24 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
}
+ public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) {
+ Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<>();
+ for (SegmentZKMetadata segMetadata : getSegmentsZKMetadata(tableNameWithType)) {
+ if (SegmentName.isLowLevelConsumerSegmentName(segMetadata.getSegmentName())
+ && segMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
+ LLCSegmentName llcName = LLCSegmentName.of(segMetadata.getSegmentName());
+ int partitionGroupId = llcName.getPartitionGroupId();
+ int sequenceNumber = llcName.getSequenceNumber();
+ String lastCompletedSegName = partitionIdToLastLLCCompletedSegmentMap.get(partitionGroupId);
+ if (lastCompletedSegName == null
+ || LLCSegmentName.of(lastCompletedSegName).getSequenceNumber() < sequenceNumber) {
+ partitionIdToLastLLCCompletedSegmentMap.put(partitionGroupId, segMetadata.getSegmentName());
+ }
+ }
+ }
+ return partitionIdToLastLLCCompletedSegmentMap.values();
+ }
+
public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List<String> segmentNames) {
return deleteSegments(tableNameWithType, segmentNames, null);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 36443146e0..93c7d8573b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -159,6 +159,10 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
}
}
}
+
+ // Remove last sealed segments such that the table can still create new consuming segments if it's paused
+ segmentsToDelete.removeAll(_pinotHelixResourceManager.getLastLLCCompletedSegments(realtimeTableName));
+
if (!segmentsToDelete.isEmpty()) {
LOGGER.info("Deleting {} segments from table: {}", segmentsToDelete.size(), realtimeTableName);
_pinotHelixResourceManager.deleteSegments(realtimeTableName, segmentsToDelete);
@@ -214,6 +218,10 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
// Write back to the lineage entry
if (SegmentLineageAccessHelper.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
segmentLineage, expectedVersion)) {
+ // Remove last sealed segments such that the table can still create new consuming segments if it's paused
+ if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ segmentsToDelete.removeAll(_pinotHelixResourceManager.getLastLLCCompletedSegments(tableNameWithType));
+ }
// Delete segments based on the segment lineage
if (!segmentsToDelete.isEmpty()) {
_pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 6da50f5c5b..2f16f830b9 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -233,6 +233,39 @@ public class RetentionManagerTest {
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
}
+ // This test makes sure that we do not clean up last llc completed segments
+ @Test
+ public void testRealtimeLastLLCCleanup()
+ throws Exception {
+ final long now = System.currentTimeMillis();
+ final int replicaCount = 1;
+
+ TableConfig tableConfig = createRealtimeTableConfig1(replicaCount);
+ List<String> removedSegments = new ArrayList<>();
+ LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ PinotHelixResourceManager pinotHelixResourceManager =
+ setupSegmentMetadataForPausedTable(tableConfig, now, removedSegments);
+ setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager, leadControllerManager);
+
+ ControllerConf conf = new ControllerConf();
+ ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+ conf.setRetentionControllerFrequencyInSeconds(0);
+ conf.setDeletedSegmentsRetentionInDays(0);
+ RetentionManager retentionManager =
+ new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
+ retentionManager.start();
+ retentionManager.run();
+
+ SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager();
+
+ // Verify that the removeAgedDeletedSegments() method in deletion manager is actually called.
+ verify(deletionManager, times(1)).removeAgedDeletedSegments(leadControllerManager);
+
+ // Verify that the deleteSegments method is actually called.
+ verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
+ }
+
private PinotHelixResourceManager setupSegmentMetadata(TableConfig tableConfig, final long now, final int nSegments,
List<String> segmentsToBeDeleted) {
final int replicaCount = tableConfig.getReplication();
@@ -297,6 +330,50 @@ public class RetentionManagerTest {
return pinotHelixResourceManager;
}
+ private PinotHelixResourceManager setupSegmentMetadataForPausedTable(TableConfig tableConfig, final long now,
+ List<String> segmentsToBeDeleted) {
+ final int replicaCount = tableConfig.getReplication();
+
+ List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
+
+ IdealState idealState =
+ PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true);
+
+ final int kafkaPartition = 5;
+ final long millisInDays = TimeUnit.DAYS.toMillis(1);
+ final String serverName = "Server_localhost_0";
+ LLCSegmentName llcSegmentName0 = new LLCSegmentName(TEST_TABLE_NAME, kafkaPartition, 0, now);
+ SegmentZKMetadata segmentZKMetadata0 = createSegmentZKMetadata(llcSegmentName0.getSegmentName(), replicaCount, now);
+ segmentZKMetadata0.setTimeUnit(TimeUnit.MILLISECONDS);
+ segmentZKMetadata0.setStartTime(now - 30 * millisInDays);
+ segmentZKMetadata0.setEndTime(now - 20 * millisInDays);
+ segmentZKMetadata0.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ segmentsZKMetadata.add(segmentZKMetadata0);
+ idealState.setPartitionState(llcSegmentName0.getSegmentName(), serverName, "ONLINE");
+ segmentsToBeDeleted.add(llcSegmentName0.getSegmentName());
+
+ LLCSegmentName llcSegmentName1 = new LLCSegmentName(TEST_TABLE_NAME, kafkaPartition, 1, now);
+ SegmentZKMetadata segmentZKMetadata1 = createSegmentZKMetadata(llcSegmentName1.getSegmentName(), replicaCount, now);
+ segmentZKMetadata1.setTimeUnit(TimeUnit.MILLISECONDS);
+ segmentZKMetadata1.setStartTime(now - 20 * millisInDays);
+ segmentZKMetadata1.setEndTime(now - 10 * millisInDays);
+ segmentZKMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ segmentsZKMetadata.add(segmentZKMetadata1);
+ idealState.setPartitionState(llcSegmentName1.getSegmentName(), serverName, "ONLINE");
+
+ PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig);
+ when(pinotHelixResourceManager.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(segmentsZKMetadata);
+ when(pinotHelixResourceManager.getHelixClusterName()).thenReturn(HELIX_CLUSTER_NAME);
+ when(pinotHelixResourceManager.getLastLLCCompletedSegments(REALTIME_TABLE_NAME)).thenCallRealMethod();
+
+ HelixAdmin helixAdmin = mock(HelixAdmin.class);
+ when(helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(idealState);
+ when(pinotHelixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
+
+ return pinotHelixResourceManager;
+ }
+
private SegmentZKMetadata createSegmentZKMetadata(String segmentName, int replicaCount, long segmentCreationTime) {
SegmentZKMetadata segmentMetadata = new SegmentZKMetadata(segmentName);
segmentMetadata.setCreationTime(segmentCreationTime);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org