You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/05 23:33:45 UTC

(pinot) branch master updated: Add support for retention on deleted keys of upsert tables (#12037)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 49804a4f56 Add support for retention on deleted keys of upsert tables (#12037)
49804a4f56 is described below

commit 49804a4f56a1ae87b5ef882dbf6373ac8190ee84
Author: Pratik Tibrewal <ti...@uber.com>
AuthorDate: Wed Dec 6 05:03:35 2023 +0530

    Add support for retention on deleted keys of upsert tables (#12037)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   2 +
 .../apache/pinot/common/metrics/ServerTimer.java   |   5 +-
 ...adataAndDictionaryAggregationPlanMakerTest.java |   2 +-
 .../upsert/BasePartitionUpsertMetadataManager.java |  23 +++-
 .../upsert/BaseTableUpsertMetadataManager.java     |  10 +-
 ...oncurrentMapPartitionUpsertMetadataManager.java |  54 ++++++++-
 .../ConcurrentMapTableUpsertMetadataManager.java   |   2 +-
 .../segment/local/utils/TableConfigUtils.java      |  17 ++-
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 124 ++++++++++++++++++---
 .../segment/local/utils/TableConfigUtilsTest.java  |  55 +++++++++
 .../pinot/spi/config/table/UpsertConfig.java       |  11 ++
 11 files changed, 270 insertions(+), 35 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 5b995d6e6a..cd7973f78d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -48,6 +48,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
   PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
   UPSERT_OUT_OF_ORDER("rows", false),
+  DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED("rows", false),
+  METADATA_TTL_PRIMARY_KEYS_REMOVED("rows", false),
   ROWS_WITH_ERRORS("rows", false),
   LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
   LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index aa0952730b..a4c16221bc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -47,7 +47,10 @@ public enum ServerTimer implements AbstractMetrics.Timer {
   SEGMENT_UPLOAD_TIME_MS("milliseconds", false),
 
   TOTAL_CPU_TIME_NS("nanoseconds", false, "Total query cost (thread cpu time + system "
-      + "activities cpu time + response serialization cpu time) for query processing on server.");
+      + "activities cpu time + response serialization cpu time) for query processing on server."),
+
+  UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
+      "Total time taken to delete expired primary keys based on metadataTTL or deletedKeysTTL");
 
   private final String _timerName;
   private final boolean _global;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 807ef5f9a0..a73f9c471f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -132,7 +132,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
         new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
             Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null,
-            false, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
+            false, 0, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
   }
 
   @AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 3e1fd4e178..4496041a59 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -40,6 +40,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -71,6 +72,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final PartialUpsertHandler _partialUpsertHandler;
   protected final boolean _enableSnapshot;
   protected final double _metadataTTL;
+  protected final double _deletedKeysTTL;
   protected final File _tableIndexDir;
   protected final ServerMetrics _serverMetrics;
   protected final Logger _logger;
@@ -98,7 +100,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+      double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _primaryKeyColumns = primaryKeyColumns;
@@ -108,6 +110,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     _partialUpsertHandler = partialUpsertHandler;
     _enableSnapshot = enableSnapshot;
     _metadataTTL = metadataTTL;
+    _deletedKeysTTL = deletedKeysTTL;
     _tableIndexDir = tableIndexDir;
     _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
     _serverMetrics = serverMetrics;
@@ -115,6 +118,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     if (metadataTTL > 0) {
       _largestSeenComparisonValue = loadWatermark();
     } else {
+      _largestSeenComparisonValue = Double.MIN_VALUE;
       deleteWatermark();
     }
   }
@@ -158,8 +162,15 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
         _tableNameWithType);
     ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
 
+    if (_deletedKeysTTL > 0) {
+      double maxComparisonValue =
+          ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
+              .getMaxValue()).doubleValue();
+      _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, maxComparisonValue);
+    }
+
     // Skip adding segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
-    if (_largestSeenComparisonValue > 0) {
+    if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
       Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled");
       Preconditions.checkState(_comparisonColumns.size() == 1,
           "Upsert TTL does not work with multiple comparison columns");
@@ -489,7 +500,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       return;
     }
     // Skip removing segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
-    if (_largestSeenComparisonValue > 0) {
+    if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
       Number maxComparisonValue =
           (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
       if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
@@ -686,7 +697,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
   @Override
   public void removeExpiredPrimaryKeys() {
-    if (_metadataTTL <= 0) {
+    if (_metadataTTL <= 0 && _deletedKeysTTL <= 0) {
       return;
     }
     if (!startOperation()) {
@@ -694,7 +705,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       return;
     }
     try {
+      long startTime = System.currentTimeMillis();
       doRemoveExpiredPrimaryKeys();
+      long duration = System.currentTimeMillis() - startTime;
+      _serverMetrics.addTimedTableValue(_tableNameWithType, ServerTimer.UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS,
+          duration, TimeUnit.MILLISECONDS);
     } finally {
       finishOperation();
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 6352bc7fcb..6c16b4e7f5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -67,6 +67,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   protected PartialUpsertHandler _partialUpsertHandler;
   protected boolean _enableSnapshot;
   protected double _metadataTTL;
+  protected double _deletedKeysTTL;
   protected File _tableIndexDir;
   protected ServerMetrics _serverMetrics;
   protected HelixManager _helixManager;
@@ -109,6 +110,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _metadataTTL = upsertConfig.getMetadataTTL();
+    _deletedKeysTTL = upsertConfig.getDeletedKeysTTL();
     _tableIndexDir = tableDataManager.getTableDataDir();
     _serverMetrics = serverMetrics;
     _helixManager = helixManager;
@@ -118,10 +120,10 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
 
     LOGGER.info(
         "Initialized {} for table: {} with primary key columns: {}, comparison columns: {}, delete record column: {},"
-            + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {}, table "
-            + "index dir: {}", getClass().getSimpleName(), _tableNameWithType, _primaryKeyColumns, _comparisonColumns,
-        _deleteRecordColumn, _hashFunction, upsertConfig.getMode(), _enableSnapshot, upsertConfig.isEnablePreload(),
-        _metadataTTL, _tableIndexDir);
+            + " hash function: {}, upsert mode: {}, enable snapshot: {}, enable preload: {}, metadata TTL: {},"
+            + " deleted Keys TTL: {}, table index dir: {}", getClass().getSimpleName(), _tableNameWithType,
+        _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn, _hashFunction, upsertConfig.getMode(),
+        _enableSnapshot, upsertConfig.isEnablePreload(), _metadataTTL, _deletedKeysTTL, _tableIndexDir);
 
     if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) {
       // Preloading the segments with snapshots for fast upsert metadata recovery.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 435bab20d1..fe36054f18 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -61,9 +61,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+      double metadataTTL, double deletedKeysTTL, File tableIndexDir, ServerMetrics serverMetrics) {
     super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn,
-        hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
+        hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, deletedKeysTTL, tableIndexDir, serverMetrics);
   }
 
   @Override
@@ -231,13 +231,55 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
 
   @Override
   public void doRemoveExpiredPrimaryKeys() {
-    double threshold = _largestSeenComparisonValue - _metadataTTL;
+    AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
+    AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger();
+    double metadataTTLKeysThreshold;
+    if (_metadataTTL > 0) {
+      metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+    } else {
+      metadataTTLKeysThreshold = Double.MIN_VALUE;
+    }
+
+    double deletedKeysThreshold;
+
+    if (_deletedKeysTTL > 0) {
+      deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+    } else {
+      deletedKeysThreshold = Double.MIN_VALUE;
+    }
     _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
-      if (((Number) recordLocation.getComparisonValue()).doubleValue() < threshold) {
+      double comparisonValue = ((Number) recordLocation.getComparisonValue()).doubleValue();
+      if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) {
         _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+        numMetadataTTLKeysRemoved.getAndIncrement();
+      } else if (_deletedKeysTTL > 0 && comparisonValue < deletedKeysThreshold) {
+        ThreadSafeMutableRoaringBitmap currentQueryableDocIds = recordLocation.getSegment().getQueryableDocIds();
+        // if key not part of queryable doc id, it means it is deleted
+        if (currentQueryableDocIds != null && !currentQueryableDocIds.contains(recordLocation.getDocId())) {
+          _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+          removeDocId(recordLocation.getSegment(), recordLocation.getDocId());
+          numDeletedTTLKeysRemoved.getAndIncrement();
+        }
       }
     });
-    persistWatermark(_largestSeenComparisonValue);
+    if (_metadataTTL > 0) {
+      persistWatermark(_largestSeenComparisonValue);
+    }
+
+    int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
+    if (numDeletedTTLKeys > 0) {
+      _logger.info("Deleted {} primary keys based on deletedKeysTTL in the table {}",
+          numDeletedTTLKeys, _tableNameWithType);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
+          numDeletedTTLKeys);
+    }
+    int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get();
+    if (numMetadataTTLKeys > 0) {
+      _logger.info("Deleted {} primary keys based on metadataTTL in the table {}",
+          numMetadataTTLKeys, _tableNameWithType);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED,
+          numMetadataTTLKeys);
+    }
   }
 
   /**
@@ -253,7 +295,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
     Comparable newComparisonValue = recordInfo.getComparisonValue();
 
     // When TTL is enabled, update largestSeenComparisonValue when adding new record
-    if (_metadataTTL > 0) {
+    if (_metadataTTL > 0 || _deletedKeysTTL > 0) {
       double comparisonValue = ((Number) newComparisonValue).doubleValue();
       _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, comparisonValue);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 3380203656..2aeee10749 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -37,7 +37,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
         k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
             _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
-            _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
+            _enableSnapshot, _metadataTTL, _deletedKeysTTL, _tableIndexDir, _serverMetrics));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 8900b6f36f..188f17e733 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -760,22 +760,29 @@ public final class TableConfigUtils {
   @VisibleForTesting
   static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
-    if (upsertConfig == null || upsertConfig.getMetadataTTL() == 0) {
+    if (upsertConfig == null || (upsertConfig.getMetadataTTL() == 0 && upsertConfig.getDeletedKeysTTL() == 0)) {
       return;
     }
 
     List<String> comparisonColumns = upsertConfig.getComparisonColumns();
     if (CollectionUtils.isNotEmpty(comparisonColumns)) {
       Preconditions.checkState(comparisonColumns.size() == 1,
-          "Upsert TTL does not work with multiple comparison columns");
+          "MetadataTTL / DeletedKeysTTL does not work with multiple comparison columns");
       String comparisonColumn = comparisonColumns.get(0);
       DataType comparisonColumnDataType = schema.getFieldSpecFor(comparisonColumn).getDataType();
       Preconditions.checkState(comparisonColumnDataType.isNumeric(),
-          "Upsert TTL must have comparison column: %s in numeric type, found: %s", comparisonColumn,
-          comparisonColumnDataType);
+          "MetadataTTL / DeletedKeysTTL must have comparison column: %s in numeric type, found: %s",
+          comparisonColumn, comparisonColumnDataType);
     }
 
-    Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must have snapshot enabled");
+    if (upsertConfig.getMetadataTTL() > 0) {
+      Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must have snapshot enabled");
+    }
+
+    if (upsertConfig.getDeletedKeysTTL() > 0) {
+      Preconditions.checkState(upsertConfig.getDeleteRecordColumn() != null,
+          "Deleted Keys TTL can only be enabled with deleteRecordColumn set.");
+    }
   }
 
   /**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index deb783c3e3..8a3949a9eb 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -91,7 +91,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   public void testStartFinishOperation() {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, INDEX_DIR,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
 
     // Start 2 operations
@@ -205,7 +205,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -369,7 +369,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0,
-            INDEX_DIR, mock(ServerMetrics.class));
+            0, INDEX_DIR, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
@@ -547,6 +547,16 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     return recordInfoList;
   }
 
+  private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[] primaryKeys, int[] timestamps,
+                                             @Nullable boolean[] deleteRecordFlags) {
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new Integer(timestamps[i]),
+              deleteRecordFlags != null && deleteRecordFlags[i]));
+    }
+    return recordInfoList;
+  }
+
   /**
    * Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
    */
@@ -660,7 +670,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -753,7 +763,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -821,7 +831,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -878,7 +888,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
-            false, 0, INDEX_DIR, mock(ServerMetrics.class));
+            false, 0, 0, INDEX_DIR, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // queryableDocIds is same as validDocIds in the absence of delete markers
@@ -973,13 +983,101 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
+  @Test
+  public void testRemoveExpiredDeletedKeys()
+          throws IOException {
+    verifyRemoveExpiredDeletedKeys(HashFunction.NONE);
+    verifyRemoveExpiredDeletedKeys(HashFunction.MD5);
+    verifyRemoveExpiredDeletedKeys(HashFunction.MURMUR3);
+  }
+
+  private void verifyRemoveExpiredDeletedKeys(HashFunction hashFunction)
+          throws IOException {
+
+    String comparisonColumn = "timeCol";
+    String deleteColumn = "deleteCol";
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+            new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+                    Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
+                    false, 0, 20, INDEX_DIR, mock(ServerMetrics.class));
+    Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add the first segment
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{0, 1, 2};
+    int[] timestamps = new int[]{100, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 =
+            mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
+    upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
+            getRecordInfoListForTTL(numRecords, primaryKeys, timestamps, null).iterator());
+
+    // Update records from the second segment
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment2 = mockMutableSegment(1, validDocIds2, queryableDocIds2);
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new Integer(100), false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    // segment2: 3 -> {0, 100}
+    checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+
+    // Mark a record with latest value in segment1 as deleted (outside TTL-window)
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new Integer(120), true));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+
+    // Mark a record with latest value in segment2 as deleted (within TTL window)
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 2, new Integer(150), true));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {2, 150}
+    checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 3, segment2, 2, 150, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 2});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{});
+
+    // delete-key segment2: 2 -> {1, 120}
+    upsertMetadataManager.removeExpiredPrimaryKeys();
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 3 -> {2, 150}
+    checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{2});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{});
+  }
+
   private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue)
       throws IOException {
     File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, 0, tableDir,
             mock(ServerMetrics.class));
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1047,7 +1145,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir,
             mock(ServerMetrics.class));
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1121,7 +1219,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
-            INDEX_DIR, mock(ServerMetrics.class));
+            0, INDEX_DIR, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
@@ -1206,7 +1304,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
             Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
-            INDEX_DIR, mock(ServerMetrics.class));
+            0, INDEX_DIR, mock(ServerMetrics.class));
 
     try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class,
         (mockReader, context) -> {
@@ -1237,7 +1335,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, 0, tableDir,
             mock(ServerMetrics.class));
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1298,7 +1396,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
       throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, INDEX_DIR,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, 0, INDEX_DIR,
             mock(ServerMetrics.class));
 
     double currentTimeMs = System.currentTimeMillis();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 9c9b2cdf43..dfdd8ccade 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1766,6 +1766,61 @@ public class TableConfigUtilsTest {
       Assert.fail("Shouldn't fail table creation when delete column type is boolean.");
     }
 
+    // upsert deleted-keys-ttl configs with no deleted column
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+            .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+            .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDeletedKeysTTL(3600);
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+            .setUpsertConfig(upsertConfig)
+            .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+            .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "Deleted Keys TTL can only be enabled with deleteRecordColumn set.");
+    }
+
+    upsertConfig.setDeleteRecordColumn(delCol);
+    // multiple comparison columns set for deleted-keys-ttl
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+            .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+            .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
+    upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN, "myCol"));
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+            .setUpsertConfig(upsertConfig)
+            .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+            .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "MetadataTTL / DeletedKeysTTL does not work with multiple comparison columns");
+    }
+
+    // comparison column with non-numeric type
+    upsertConfig.setComparisonColumns(Lists.newArrayList("myCol"));
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+            .setUpsertConfig(upsertConfig)
+            .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+            .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "MetadataTTL / DeletedKeysTTL must have comparison column: myCol in numeric type, found: STRING");
+    }
+
+    // time column as comparison column
+    upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN));
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+            .setUpsertConfig(upsertConfig)
+            .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+            .build();
+    TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+
     // upsert out-of-order configs
     String outOfOrderRecordColumn = "outOfOrderRecordColumn";
     boolean dropOutOfOrderRecord = true;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 3f9b67c59c..906c6ceedf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -66,6 +66,9 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Whether to use TTL for upsert metadata cleanup, it uses the same unit as comparison col")
   private double _metadataTTL;
 
+  @JsonPropertyDescription("TTL for upsert metadata cleanup for deleted keys, it uses the same unit as comparison col")
+  private double _deletedKeysTTL;
+
   @JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery")
   private boolean _enablePreload;
 
@@ -129,6 +132,10 @@ public class UpsertConfig extends BaseJsonConfig {
     return _metadataTTL;
   }
 
+  public double getDeletedKeysTTL() {
+    return _deletedKeysTTL;
+  }
+
   public boolean isEnablePreload() {
     return _enablePreload;
   }
@@ -207,6 +214,10 @@ public class UpsertConfig extends BaseJsonConfig {
     _metadataTTL = metadataTTL;
   }
 
+  public void setDeletedKeysTTL(double deletedKeysTTL) {
+    _deletedKeysTTL = deletedKeysTTL;
+  }
+
   public void setEnablePreload(boolean enablePreload) {
     _enablePreload = enablePreload;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org