You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/05 23:33:45 UTC
(pinot) branch master updated: Add support for retention on deleted keys of upsert tables (#12037)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 49804a4f56 Add support for retention on deleted keys of upsert tables (#12037)
49804a4f56 is described below
commit 49804a4f56a1ae87b5ef882dbf6373ac8190ee84
Author: Pratik Tibrewal <ti...@uber.com>
AuthorDate: Wed Dec 6 05:03:35 2023 +0530
Add support for retention on deleted keys of upsert tables (#12037)
---
.../apache/pinot/common/metrics/ServerMeter.java | 2 +
.../apache/pinot/common/metrics/ServerTimer.java | 5 +-
...adataAndDictionaryAggregationPlanMakerTest.java | 2 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 23 +++-
.../upsert/BaseTableUpsertMetadataManager.java | 10 +-
...oncurrentMapPartitionUpsertMetadataManager.java | 54 ++++++++-
.../ConcurrentMapTableUpsertMetadataManager.java | 2 +-
.../segment/local/utils/TableConfigUtils.java | 17 ++-
...rrentMapPartitionUpsertMetadataManagerTest.java | 124 ++++++++++++++++++---
.../segment/local/utils/TableConfigUtilsTest.java | 55 +++++++++
.../pinot/spi/config/table/UpsertConfig.java | 11 ++
11 files changed, 270 insertions(+), 35 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 5b995d6e6a..cd7973f78d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -48,6 +48,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
UPSERT_OUT_OF_ORDER("rows", false),
+ DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED("rows", false),
+ METADATA_TTL_PRIMARY_KEYS_REMOVED("rows", false),
ROWS_WITH_ERRORS("rows", false),
LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index aa0952730b..a4c16221bc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -47,7 +47,10 @@ public enum ServerTimer implements AbstractMetrics.Timer {
SEGMENT_UPLOAD_TIME_MS("milliseconds", false),
TOTAL_CPU_TIME_NS("nanoseconds", false, "Total query cost (thread cpu time + system "
- + "activities cpu time + response serialization cpu time) for query processing on server.");
+ + "activities cpu time + response serialization cpu time) for query processing on server."),
+
+ UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
+ "Total time taken to delete expired primary keys based on metadataTTL or deletedKeysTTL");
private final String _timerName;
private final boolean _global;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 807ef5f9a0..a73f9c471f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -132,7 +132,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null,
- false, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
+ false, 0, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
}
@AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 3e1fd4e178..4496041a59 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -40,6 +40,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -71,6 +72,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
protected final double _metadataTTL;
+ protected final double _deletedKeysTTL;
protected final File _tableIndexDir;
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
@@ -98,7 +100,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
- double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+ double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
@@ -108,6 +110,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
_metadataTTL = metadataTTL;
+ _deletedKeysTTL = deletedKeysTTL;
_tableIndexDir = tableIndexDir;
_snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
_serverMetrics = serverMetrics;
@@ -115,6 +118,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
if (metadataTTL > 0) {
_largestSeenComparisonValue = loadWatermark();
} else {
+ _largestSeenComparisonValue = Double.MIN_VALUE;
deleteWatermark();
}
}
@@ -158,8 +162,15 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_tableNameWithType);
ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+ if (_deletedKeysTTL > 0) {
+ double maxComparisonValue =
+ ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
+ .getMaxValue()).doubleValue();
+ _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, maxComparisonValue);
+ }
+
// Skip adding segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
- if (_largestSeenComparisonValue > 0) {
+ if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled");
Preconditions.checkState(_comparisonColumns.size() == 1,
"Upsert TTL does not work with multiple comparison columns");
@@ -489,7 +500,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
return;
}
// Skip removing segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
- if (_largestSeenComparisonValue > 0) {
+ if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
Number maxComparisonValue =
(Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
@@ -686,7 +697,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
@Override
public void removeExpiredPrimaryKeys() {
- if (_metadataTTL <= 0) {
+ if (_metadataTTL <= 0 && _deletedKeysTTL <= 0) {
return;
}
if (!startOperation()) {
@@ -694,7 +705,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
return;
}
try {
+ long startTime = System.currentTimeMillis();
doRemoveExpiredPrimaryKeys();
+ long duration = System.currentTimeMillis() - startTime;
+ _serverMetrics.addTimedTableValue(_tableNameWithType, ServerTimer.UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS,
+ duration, TimeUnit.MILLISECONDS);
} finally {
finishOperation();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 6352bc7fcb..6c16b4e7f5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -67,6 +67,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
protected PartialUpsertHandler _partialUpsertHandler;
protected boolean _enableSnapshot;
protected double _metadataTTL;
+ protected double _deletedKeysTTL;
protected File _tableIndexDir;
protected ServerMetrics _serverMetrics;
protected HelixManager _helixManager;
@@ -109,6 +110,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
_enableSnapshot = upsertConfig.isEnableSnapshot();
_metadataTTL = upsertConfig.getMetadataTTL();
+ _deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
_tableIndexDir = tableDataManager.getTableDataDir();
_serverMetrics = serverMetrics;
_helixManager = helixManager;
@@ -118,10 +120,10 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
LOGGER.info(
"Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {},"
- + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {}, table "
- + "index dir: {}", getClass().getSimpleName(), _tableNameWithType, _primaryKeyColumns, _comparisonColumns,
- _deleteRecordColumn, _hashFunction, upsertConfig.getMode(), _enableSnapshot, upsertConfig.isEnablePreload(),
- _metadataTTL, _tableIndexDir);
+ + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {},"
+ + " deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType,
+ _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, upsertConfig.getMode(),
+ _enableSnapshot, upsertConfig.isEnablePreload(), _metadataTTL, _deletedKeysTTL, _tableIndexDir);
if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) {
// Preloading the segments with snapshots for fast upsert metadata recovery.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 435bab20d1..fe36054f18 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -61,9 +61,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
- double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+ double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) {
super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn,
- hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
+ hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, deletedKeysTTL, tableIndexDir, serverMetrics);
}
@Override
@@ -231,13 +231,55 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
@Override
public void doRemoveExpiredPrimaryKeys() {
- double threshold = _largestSeenComparisonValue - _metadataTTL;
+ AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
+ AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger();
+ double metadataTTLKeysThreshold;
+ if (_metadataTTL > 0) {
+ metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+ } else {
+ metadataTTLKeysThreshold = Double.MIN_VALUE;
+ }
+
+ double deletedKeysThreshold;
+
+ if (_deletedKeysTTL > 0) {
+ deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+ } else {
+ deletedKeysThreshold = Double.MIN_VALUE;
+ }
_primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
- if (((Number) recordLocation.getComparisonValue()).doubleValue() < threshold) {
+ double comparisonValue = ((Number) recordLocation.getComparisonValue()).doubleValue();
+ if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) {
_primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+ numMetadataTTLKeysRemoved.getAndIncrement();
+ } else if (_deletedKeysTTL > 0 && comparisonValue < deletedKeysThreshold) {
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds = recordLocation.getSegment().getQueryableDocIds();
+ // if key not part of queryable doc id, it means it is deleted
+ if (currentQueryableDocIds != null && !currentQueryableDocIds.contains(recordLocation.getDocId())) {
+ _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+ removeDocId(recordLocation.getSegment(), recordLocation.getDocId());
+ numDeletedTTLKeysRemoved.getAndIncrement();
+ }
}
});
- persistWatermark(_largestSeenComparisonValue);
+ if (_metadataTTL > 0) {
+ persistWatermark(_largestSeenComparisonValue);
+ }
+
+ int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
+ if (numDeletedTTLKeys > 0) {
+ _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}",
+ numDeletedTTLKeys, _tableNameWithType);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
+ numDeletedTTLKeys);
+ }
+ int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get();
+ if (numMetadataTTLKeys > 0) {
+ _logger.info("Deleted {} primary keys based on metadataTTL in the table {}",
+ numMetadataTTLKeys, _tableNameWithType);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED,
+ numMetadataTTLKeys);
+ }
}
/**
@@ -253,7 +295,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
Comparable newComparisonValue = recordInfo.getComparisonValue();
// When TTL is enabled, update largestSeenComparisonValue when adding new record
- if (_metadataTTL > 0) {
+ if (_metadataTTL > 0 || _deletedKeysTTL > 0) {
double comparisonValue = ((Number) newComparisonValue).doubleValue();
_largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, comparisonValue);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 3380203656..2aeee10749 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -37,7 +37,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
- _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
+ _enableSnapshot, _metadataTTL, _deletedKeysTTL, _tableIndexDir, _serverMetrics));
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 8900b6f36f..188f17e733 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -760,22 +760,29 @@ public final class TableConfigUtils {
@VisibleForTesting
static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
- if (upsertConfig == null || upsertConfig.getMetadataTTL() == 0) {
+ if (upsertConfig == null || (upsertConfig.getMetadataTTL() == 0 && upsertConfig.getDeletedKeysTTL() == 0)) {
return;
}
List<String> comparisonColumns = upsertConfig.getComparisonColumns();
if (CollectionUtils.isNotEmpty(comparisonColumns)) {
Preconditions.checkState(comparisonColumns.size() == 1,
- "Upsert TTL does not work with multiple comparison columns");
+ "MetadataTTL / DeletedKeysTTL does not work with multiple comparison columns");
String comparisonColumn = comparisonColumns.get(0);
DataType comparisonColumnDataType = schema.getFieldSpecFor(comparisonColumn).getDataType();
Preconditions.checkState(comparisonColumnDataType.isNumeric(),
- "Upsert TTL must have comparison column: %s in numeric type, found: %s", comparisonColumn,
- comparisonColumnDataType);
+ "MetadataTTL / DeletedKeysTTL must have comparison column: %s in numeric type, found: %s",
+ comparisonColumn, comparisonColumnDataType);
}
- Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must have snapshot enabled");
+ if (upsertConfig.getMetadataTTL() > 0) {
+ Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must have snapshot enabled");
+ }
+
+ if (upsertConfig.getDeletedKeysTTL() > 0) {
+ Preconditions.checkState(upsertConfig.getDeleteRecordColumn() != null,
+ "Deleted Keys TTL can only be enabled with deleteRecordColumn set.");
+ }
}
/**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index deb783c3e3..8a3949a9eb 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -91,7 +91,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
public void testStartFinishOperation() {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, INDEX_DIR,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
// Start 2 operations
@@ -205,7 +205,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -369,7 +369,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0,
- INDEX_DIR, mock(ServerMetrics.class));
+ 0, INDEX_DIR, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -547,6 +547,16 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
return recordInfoList;
}
+ private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[] primaryKeys, int[] timestamps,
+ @Nullable boolean[] deleteRecordFlags) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new Integer(timestamps[i]),
+ deleteRecordFlags != null && deleteRecordFlags[i]));
+ }
+ return recordInfoList;
+ }
+
/**
* Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
*/
@@ -660,7 +670,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -753,7 +763,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -821,7 +831,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -878,7 +888,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
- false, 0, INDEX_DIR, mock(ServerMetrics.class));
+ false, 0, 0, INDEX_DIR, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// queryableDocIds is same as validDocIds in the absence of delete markers
@@ -973,13 +983,101 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
+ @Test
+ public void testRemoveExpiredDeletedKeys()
+ throws IOException {
+ verifyRemoveExpiredDeletedKeys(HashFunction.NONE);
+ verifyRemoveExpiredDeletedKeys(HashFunction.MD5);
+ verifyRemoveExpiredDeletedKeys(HashFunction.MURMUR3);
+ }
+
+ private void verifyRemoveExpiredDeletedKeys(HashFunction hashFunction)
+ throws IOException {
+
+ String comparisonColumn = "timeCol";
+ String deleteColumn = "deleteCol";
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
+ false, 0, 20, INDEX_DIR, mock(ServerMetrics.class));
+ Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add the first segment
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ int numRecords = 3;
+ int[] primaryKeys = new int[]{0, 1, 2};
+ int[] timestamps = new int[]{100, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
+ upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
+ getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, null).iterator());
+
+ // Update records from the second segment
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment2 = mockMutableSegment(1, validDocIds2, queryableDocIds2);
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new Integer(100), false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ // segment2: 3 -> {0, 100}
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+
+ // Mark a record with latest value in segment1 as deleted (outside TTL-window)
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new Integer(120), true));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+
+ // Mark a record with latest value in segment2 as deleted (within TTL window)
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 2, new Integer(150), true));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {2, 150}
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment2, 2, 150, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 2});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{});
+
+ // delete-key segment2: 2 -> {1, 120}
+ upsertMetadataManager.removeExpiredPrimaryKeys();
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 3 -> {2, 150}
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{});
+ }
+
private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue)
throws IOException {
File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, 0, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1047,7 +1145,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1121,7 +1219,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
- INDEX_DIR, mock(ServerMetrics.class));
+ 0, INDEX_DIR, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -1206,7 +1304,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
- INDEX_DIR, mock(ServerMetrics.class));
+ 0, INDEX_DIR, mock(ServerMetrics.class));
try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class,
(mockReader, context) -> {
@@ -1237,7 +1335,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1298,7 +1396,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, INDEX_DIR,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, 0, INDEX_DIR,
mock(ServerMetrics.class));
double currentTimeMs = System.currentTimeMillis();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 9c9b2cdf43..dfdd8ccade 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1766,6 +1766,61 @@ public class TableConfigUtilsTest {
Assert.fail("Shouldn't fail table creation when delete column type is boolean.");
}
+ // upsert deleted-keys-ttl configs with no deleted column
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeletedKeysTTL(3600);
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Deleted Keys TTL can only be enabled with deleteRecordColumn set.");
+ }
+
+ upsertConfig.setDeleteRecordColumn(delCol);
+ // multiple comparison columns set for deleted-keys-ttl
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
+ upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN, "myCol"));
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "MetadataTTL / DeletedKeysTTL does not work with multiple comparison columns");
+ }
+
+ // comparison column with non-numeric type
+ upsertConfig.setComparisonColumns(Lists.newArrayList("myCol"));
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "MetadataTTL / DeletedKeysTTL must have comparison column: myCol in numeric type, found: STRING");
+ }
+
+ // time column as comparison column
+ upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN));
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+
// upsert out-of-order configs
String outOfOrderRecordColumn = "outOfOrderRecordColumn";
boolean dropOutOfOrderRecord = true;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 3f9b67c59c..906c6ceedf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -66,6 +66,9 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Whether to use TTL for upsert metadata cleanup, it uses the same unit as comparison col")
private double _metadataTTL;
+ @JsonPropertyDescription("TTL for upsert metadata cleanup for deleted keys, it uses the same unit as comparison col")
+ private double _deletedKeysTTL;
+
@JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery")
private boolean _enablePreload;
@@ -129,6 +132,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _metadataTTL;
}
+ public double getDeletedKeysTTL() {
+ return _deletedKeysTTL;
+ }
+
public boolean isEnablePreload() {
return _enablePreload;
}
@@ -207,6 +214,10 @@ public class UpsertConfig extends BaseJsonConfig {
_metadataTTL = metadataTTL;
}
+ public void setDeletedKeysTTL(double deletedKeysTTL) {
+ _deletedKeysTTL = deletedKeysTTL;
+ }
+
public void setEnablePreload(boolean enablePreload) {
_enablePreload = enablePreload;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org