You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/09/13 22:32:27 UTC
[pinot] branch master updated: Fix retention for cleaning up
segment lineage (#7424)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe03999 Fix retention for cleaning up segment lineage (#7424)
fe03999 is described below
commit fe03999a9c59aa1a97eeb5cf070858c6e10f23b6
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Mon Sep 13 15:32:10 2021 -0700
Fix retention for cleaning up segment lineage (#7424)
Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
.../helix/core/retention/RetentionManager.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index b9b0d09..f605e4c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -79,8 +79,13 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
@Override
protected void processTable(String tableNameWithType) {
- LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+ // Manage normal table retention except segment lineage cleanup.
+ // The reason of separating the logic is that REFRESH only table will be skipped in the first part,
+ // whereas the segment lineage cleanup needs to be handled.
manageRetentionForTable(tableNameWithType);
+
+ // Delete segments based on segment lineage and clean up segment lineage metadata.
+ manageSegmentLineageCleanupForTable(tableNameWithType);
}
@Override
@@ -90,6 +95,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
}
private void manageRetentionForTable(String tableNameWithType) {
+ LOGGER.info("Start managing retention for table: {}", tableNameWithType);
// Build retention strategy from table config
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
@@ -102,7 +108,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
String segmentPushType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
if (tableConfig.getTableType() == TableType.OFFLINE && !"APPEND".equalsIgnoreCase(segmentPushType)) {
- LOGGER.info("Segment push type is not APPEND for table: {}, skip", tableNameWithType);
+ LOGGER.info("Segment push type is not APPEND for table: {}, skip managing retention", tableNameWithType);
return;
}
String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
@@ -123,9 +129,6 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
} else {
manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
}
-
- // Delete segments based on segment lineage and clean up segment lineage metadata
- manageSegmentLineageCleanupForTable(tableNameWithType);
}
private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy) {
@@ -202,6 +205,8 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
if (segmentLineageZNRecord == null) {
return true;
}
+ LOGGER.info("Start cleaning up segment lineage for table: {}", tableNameWithType);
+ long cleanupStartTime = System.currentTimeMillis();
SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
int expectedVersion = segmentLineageZNRecord.getVersion();
@@ -246,8 +251,12 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), segmentLineage, expectedVersion)) {
// Delete segments based on the segment lineage
_pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete);
+ LOGGER.info("Finished cleaning up segment lineage for table: {} in {}ms", tableNameWithType,
+ (System.currentTimeMillis() - cleanupStartTime));
return true;
} else {
+ LOGGER.warn("Failed to write segment lineage back when cleaning up segment lineage for table: {}",
+ tableNameWithType);
return false;
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org