You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/24 04:34:28 UTC

[pinot] branch master updated: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup (#10915)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5eafe76ee8 [Upsert TTL] Add Watermark for each partitions for Primary key cleanup (#10915)
5eafe76ee8 is described below

commit 5eafe76ee89adfd12007db17de7a50c5dd7a43ac
Author: deemoliu <qi...@uber.com>
AuthorDate: Sun Jul 23 21:34:21 2023 -0700

    [Upsert TTL] Add Watermark for each partitions for Primary key cleanup (#10915)
---
 .../realtime/LLRealtimeSegmentDataManager.java     |   2 +
 ...adataAndDictionaryAggregationPlanMakerTest.java |   4 +-
 .../upsert/BasePartitionUpsertMetadataManager.java | 143 ++++++++++-
 .../upsert/BaseTableUpsertMetadataManager.java     |   4 +
 ...oncurrentMapPartitionUpsertMetadataManager.java |  22 +-
 .../ConcurrentMapTableUpsertMetadataManager.java   |   4 +-
 .../upsert/PartitionUpsertMetadataManager.java     |   5 +
 .../segment/local/utils/TableConfigUtils.java      |  29 +++
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 279 +++++++++++++++++++--
 .../segment/local/utils/TableConfigUtilsTest.java  |  85 +++++++
 .../org/apache/pinot/segment/spi/V1Constants.java  |   1 +
 .../mutable/ThreadSafeMutableRoaringBitmap.java    |   4 +
 .../pinot/spi/config/table/UpsertConfig.java       |  11 +
 13 files changed, 570 insertions(+), 23 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 3f655728bc..5a2d075cae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -675,6 +675,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         // Take upsert snapshot before starting consuming events
         if (_partitionUpsertMetadataManager != null) {
           _partitionUpsertMetadataManager.takeSnapshot();
+          // If upsertTTL is enabled, we will remove expired primary keys from upsertMetadata after taking snapshot.
+          _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
         }
 
         while (!_state.isFinal()) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 47459fb691..874c8a6bed 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -131,8 +131,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
         new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
-            Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, serverMetrics),
-        new ThreadSafeMutableRoaringBitmap(), null);
+            Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, 0, INDEX_DIR,
+            serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
   }
 
   @AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 4632047989..6dc9d2bb7f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -20,7 +20,12 @@ package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -32,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -41,6 +47,7 @@ import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -61,6 +68,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final HashFunction _hashFunction;
   protected final PartialUpsertHandler _partialUpsertHandler;
   protected final boolean _enableSnapshot;
+  protected final double _metadataTTL;
+  protected final File _tableIndexDir;
   protected final ServerMetrics _serverMetrics;
   protected final Logger _logger;
 
@@ -78,10 +87,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
   protected int _numOutOfOrderEvents = 0;
 
+  // Used to maintain the largestSeenComparisonValue to avoid handling out-of-ttl segments/records.
+  // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments.
+  protected volatile double _largestSeenComparisonValue;
+
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      ServerMetrics serverMetrics) {
+      double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _primaryKeyColumns = primaryKeyColumns;
@@ -90,9 +103,16 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     _hashFunction = hashFunction;
     _partialUpsertHandler = partialUpsertHandler;
     _enableSnapshot = enableSnapshot;
+    _metadataTTL = metadataTTL;
+    _tableIndexDir = tableIndexDir;
     _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
     _serverMetrics = serverMetrics;
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
+    if (metadataTTL > 0) {
+      _largestSeenComparisonValue = loadWatermark();
+    } else {
+      deleteWatermark();
+    }
   }
 
   @Override
@@ -114,13 +134,36 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
         "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
         _tableNameWithType);
+    ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+
+    // Skip adding segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
+    if (_largestSeenComparisonValue > 0) {
+      Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled");
+      Preconditions.checkState(_comparisonColumns.size() == 1,
+          "Upsert TTL does not work with multiple comparison columns");
+      // TODO: Support deletion for TTL. Need to construct queryableDocIds when adding segments out of TTL.
+      Preconditions.checkState(_deleteRecordColumn == null, "Upsert TTL doesn't work with record deletion");
+      Number maxComparisonValue =
+          (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+      if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
+        _logger.info("Skip adding segment: {} because it's out of TTL", segmentName);
+        MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot();
+        if (validDocIdsSnapshot != null) {
+          immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), null);
+        } else {
+          _logger.warn("Failed to find snapshot from segment: {} which is out of TTL, treating all documents as valid",
+              segmentName);
+        }
+        return;
+      }
+    }
 
     if (_enableSnapshot) {
       _snapshotLock.readLock().lock();
     }
     startOperation();
     try {
-      doAddSegment((ImmutableSegmentImpl) segment);
+      doAddSegment(immutableSegment);
       _trackedSegments.add(segment);
     } finally {
       finishOperation();
@@ -365,6 +408,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       IndexSegment oldSegment) {
     String segmentName = segment.getSegmentName();
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+
+    // Currently when TTL is enabled, we don't support skip loading out-of-TTL segment with snapshots, since we don't
+    // know which docs are valid in the new segment.
+    // TODO: when ttl is enabled, we can allow
+    //       (1) skip loading segments without any invalid docs.
+    //       (2) assign the invalid docs from the replaced segment to the new segment.
     segmentLock.lock();
     try {
       MutableRoaringBitmap validDocIdsForOldSegment =
@@ -419,6 +468,16 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       return;
     }
 
+    // Skip removing segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
+    if (_largestSeenComparisonValue > 0) {
+      Number maxComparisonValue =
+          (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+      if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
+        _logger.info("Skip removing segment: {} because it's out of TTL", segmentName);
+        return;
+      }
+    }
+
     if (_enableSnapshot) {
       _snapshotLock.readLock().lock();
     }
@@ -544,6 +603,63 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Loads watermark from the file if exists.
+   */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping", watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Persists watermark to the file.
+   */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false);
+          DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
+        dataOutputStream.writeDouble(watermark);
+      }
+      _logger.info("Persisted watermark: {} to file: {}", watermark, watermarkFile);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, skipping", watermarkFile);
+    }
+  }
+
+  /**
+   * Deletes the watermark file.
+   */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      if (!FileUtils.deleteQuietly(watermarkFile)) {
+        _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+      }
+    }
+  }
+
+  protected File getWatermarkFile() {
+    return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId);
+  }
+
   protected void startOperation() {
     _numPendingOperations.getAndIncrement();
   }
@@ -556,6 +672,29 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
+  @Override
+  public void removeExpiredPrimaryKeys() {
+    if (_metadataTTL <= 0) {
+      return;
+    }
+    if (_stopped) {
+      _logger.info("Skip removing expired primary keys because metadata manager is already stopped");
+      return;
+    }
+
+    startOperation();
+    try {
+      doRemoveExpiredPrimaryKeys();
+    } finally {
+      finishOperation();
+    }
+  }
+
+  /**
+   * Removes all primary keys that have comparison value smaller than (largestSeenComparisonValue - TTL).
+   */
+  protected abstract void doRemoveExpiredPrimaryKeys();
+
   @Override
   public void stop() {
     _stopped = true;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index c6434ec487..e23689ad09 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -67,6 +67,8 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   protected HashFunction _hashFunction;
   protected PartialUpsertHandler _partialUpsertHandler;
   protected boolean _enableSnapshot;
+  protected double _metadataTTL;
+  protected File _tableIndexDir;
   protected ServerMetrics _serverMetrics;
   private volatile boolean _isPreloading = false;
 
@@ -104,6 +106,8 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
     }
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
+    _metadataTTL = upsertConfig.getMetadataTTL();
+    _tableIndexDir = tableDataManager.getTableDataDir();
     _serverMetrics = serverMetrics;
     if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) {
       // Preloading the segments with snapshots for fast upsert metadata recovery.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index cae8092b2f..3ce09412e3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -59,9 +60,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      ServerMetrics serverMetrics) {
+      double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
     super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction,
-        partialUpsertHandler, enableSnapshot, serverMetrics);
+        partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
   }
 
   @Override
@@ -205,6 +206,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   @Override
   protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
     assert !validDocIds.isEmpty();
+
     PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]);
     PeekableIntIterator iterator = validDocIds.getIntIterator();
     try (
@@ -226,6 +228,17 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
     }
   }
 
+  @Override
+  public void doRemoveExpiredPrimaryKeys() {
+    double threshold = _largestSeenComparisonValue - _metadataTTL;
+    _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
+      if (((Number) recordLocation.getComparisonValue()).doubleValue() < threshold) {
+        _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+      }
+    });
+    persistWatermark(_largestSeenComparisonValue);
+  }
+
   @Override
   protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
     ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
@@ -234,6 +247,11 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
     Comparable newComparisonValue = recordInfo.getComparisonValue();
     _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
         (primaryKey, currentRecordLocation) -> {
+          // Update largestSeenComparisonValue when adding new record
+          if (_metadataTTL > 0) {
+            double comparisonValue = ((Number) recordInfo.getComparisonValue()).doubleValue();
+            _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, comparisonValue);
+          }
           if (currentRecordLocation != null) {
             // Existing primary key
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index b08ea591d1..3380203656 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,8 +36,8 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
   public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
         k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
-            _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot,
-            _serverMetrics));
+            _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
+            _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index fbf864bd58..6ebd08c74c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -97,6 +97,11 @@ public interface PartitionUpsertMetadataManager extends Closeable {
    */
   void takeSnapshot();
 
+  /**
+   * Remove the expired primary keys from the metadata when TTL is enabled.
+   */
+  void removeExpiredPrimaryKeys();
+
   /**
    * Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
    */
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 76cba334e5..d5741fa0b7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -611,6 +611,35 @@ public final class TableConfigUtils {
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates the upsert config related to TTL.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+    if (upsertConfig == null || upsertConfig.getMetadataTTL() == 0) {
+      return;
+    }
+
+    List<String> comparisonColumns = upsertConfig.getComparisonColumns();
+    if (CollectionUtils.isNotEmpty(comparisonColumns)) {
+      Preconditions.checkState(comparisonColumns.size() == 1,
+          "Upsert TTL does not work with multiple comparison columns");
+      String comparisonColumn = comparisonColumns.get(0);
+      DataType comparisonColumnDataType = schema.getFieldSpecFor(comparisonColumn).getDataType();
+      Preconditions.checkState(comparisonColumnDataType.isNumeric(),
+          "Upsert TTL must have comparison column: %s in numeric type, found: %s", comparisonColumn,
+          comparisonColumnDataType);
+    }
+
+    Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must have snapshot enabled");
+
+    // TODO: Support deletion for TTL. Need to construct queryableDocIds when adding segments out of TTL.
+    Preconditions.checkState(upsertConfig.getDeleteRecordColumn() == null,
+        "Upsert TTL doesn't work with record deletion");
   }
 
   /**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 466f1885d5..fe3907ada3 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -25,15 +26,19 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
 import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
@@ -45,7 +50,7 @@ import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -56,11 +61,21 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
 
 
 public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), "ConcurrentMapPartitionUpsertMetadataManagerTest");
+
+  @BeforeClass
+  public void setup()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+    FileUtils.forceMkdir(INDEX_DIR);
+  }
 
   @Test
   public void testAddReplaceRemoveSegment()
@@ -73,12 +88,27 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true);
   }
 
+  @Test
+  public void testUpsertMetadataCleanupWithTTLConfig() {
+    verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
+    verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
+    verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
+    verifyRemoveExpiredPrimaryKeys(new Long(80), new Long(120));
+    verifyPersistAndLoadWatermark();
+    verifyAddSegmentForTTL(new Integer(80));
+    verifyAddSegmentForTTL(new Float(80));
+    verifyAddSegmentForTTL(new Double(80));
+    verifyAddSegmentForTTL(new Long(80));
+    verifyAddOutOfTTLSegment();
+  }
+
   private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
       throws IOException {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+            mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
@@ -240,7 +270,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String deleteRecordColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false,
+            Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -309,7 +339,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
-    Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
     assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
 
     // Add an empty segment
@@ -324,7 +354,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
-    Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
     assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
 
     // Replace (reload) the first segment
@@ -347,9 +377,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
-    Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
     assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
-    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
 
     // Remove the original segment1
     upsertMetadataManager.removeSegment(segment1);
@@ -363,9 +393,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
-    Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
     assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
-    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
 
     // Remove the empty segment
     upsertMetadataManager.removeSegment(emptySegment);
@@ -379,7 +409,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
-    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
 
     // Remove segment2
     upsertMetadataManager.removeSegment(segment2);
@@ -391,7 +421,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(trackedSegments, Collections.singleton(newSegment1));
     assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
-    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
 
     // Stop the metadata manager
     upsertMetadataManager.stop();
@@ -403,7 +433,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
     assertEquals(trackedSegments, Collections.singleton(newSegment1));
-    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
 
     // Close the metadata manager
     upsertMetadataManager.close();
@@ -458,6 +488,25 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     return segment;
   }
 
+  private static ImmutableSegmentImpl mockImmutableSegmentWithEndTime(int sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, List<String> comparisonColumns, Comparable endTime, MutableRoaringBitmap snapshot) {
+    ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber, validDocIds, queryableDocIds, primaryKeys);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+    when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
+      this.put(comparisonColumns.get(0), columnMetadata);
+    }});
+    when(columnMetadata.getMaxValue()).thenReturn(endTime);
+    if (snapshot != null) {
+      when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
+    } else {
+      when(segment.loadValidDocIdsFromSnapshot()).thenReturn(validDocIds.getMutableRoaringBitmap());
+    }
+    return segment;
+  }
+
   private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
@@ -504,7 +553,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+            mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -594,7 +644,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+            mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -642,13 +693,14 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     verifyAddRecordWithDeleteColumn(HashFunction.MD5);
     verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
   }
+
   private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
       throws IOException {
     String comparisonColumn = "timeCol";
     String deleteColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false,
+            Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -744,6 +796,203 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
+  private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue) {
+    File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir,
+            mock(ServerMetrics.class));
+    Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add record to update largestSeenTimestamp, largest seen timestamp: earlierComparisonValue
+    ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+    upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, earlierComparisonValue, false));
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+    // Add a segment with segmentEndTime = earlierComparisonValue, so it will not be skipped
+    int numRecords = 4;
+    int[] primaryKeys = new int[]{0, 1, 2, 3};
+    Number[] timestamps = new Number[]{100, 100, 120, 80};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"),
+            earlierComparisonValue, null);
+
+    int[] docIds1 = new int[]{0, 1, 2, 3};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+
+    // load segment1.
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator());
+    assertEquals(recordLocationMap.size(), 5);
+    checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+    // Add record to update largestSeenTimestamp, largest seen timestamp: largerComparisonValue
+    upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false));
+    assertEquals(recordLocationMap.size(), 5);
+    checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120, HashFunction.NONE);
+
+    // records before (largest seen timestamp - TTL) are expired and removed from upsertMetadata.
+    upsertMetadataManager.removeExpiredPrimaryKeys();
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120, HashFunction.NONE);
+
+    // ValidDocIds for out-of-ttl records should not be removed.
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3});
+  }
+
+  private void verifyAddOutOfTTLSegment() {
+    File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
+            mock(ServerMetrics.class));
+    Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add record to update largestSeenTimestamp, largest seen timestamp: 80
+    ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+    upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, new Double(80), false));
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+    // Add a segment with segmentEndTime = 80, so it will not be skipped
+    int numRecords = 4;
+    int[] primaryKeys = new int[]{0, 1, 2, 3};
+    Number[] timestamps = new Number[]{100, 100, 120, 80};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"),
+            new Double(80), null);
+
+    int[] docIds1 = new int[]{0, 1, 2, 3};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+
+    // load segment1 with segmentEndTime: 80, largest seen timestamp: 80. the segment will be loaded.
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator());
+    assertEquals(recordLocationMap.size(), 5);
+    checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3});
+
+    // Add record to update largestSeenTimestamp, largest seen timestamp: 120
+    upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(0), 0, new Double(120), false));
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1, 2, 3});
+    assertEquals(recordLocationMap.size(), 5);
+    checkRecordLocationForTTL(recordLocationMap, 0, segment0, 0, 120, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+    // Add an out-of-ttl segment, verify all the invalid docs should not show up again.
+    // Add a segment with segmentEndTime: 80, largest seen timestamp: 120. the segment will be skipped.
+    List<PrimaryKey> primaryKeys2 = getPrimaryKeyList(numRecords, new int[]{100, 101, 102, 103});
+    int[] docIds2 = new int[]{0, 1};
+    MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+    validDocIdsSnapshot2.add(docIds2);
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2, Collections.singletonList("timeCol"),
+            new Double(80), validDocIdsSnapshot2);
+    upsertMetadataManager.addSegment(segment2);
+    // out of ttl segment should not be added to recordLocationMap
+    assertEquals(recordLocationMap.size(), 5);
+  }
+
+  private void verifyAddSegmentForTTL(Comparable comparisonValue) {
+    File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
+            mock(ServerMetrics.class));
+    Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add record to update largestSeenTimestamp, largest seen timestamp: comparisonValue
+    ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+    upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, comparisonValue, false));
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+    // add a segment with segmentEndTime = -1 so it will be skipped since it out-of-TTL
+    int numRecords = 4;
+    int[] primaryKeys = new int[]{0, 1, 2, 3};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"), -1,
+            null);
+
+    int[] docIds1 = new int[]{0, 1, 2, 3};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+
+    // load segment1.
+    upsertMetadataManager.addSegment(segment1);
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+  }
+
+  // Add the following utils function since the Comparison column is a long value for TTL enabled upsert table.
+  private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[] primaryKeys, Number[] timestamps) {
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    for (int i = 0; i < numRecords; i++) {
+      recordInfoList.add(
+          new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new Double(timestamps[i].doubleValue()), false));
+    }
+    return recordInfoList;
+  }
+
+  // Add the following utils function since the Comparison column is a long value for TTL enabled upsert table.
+  private static void checkRecordLocationForTTL(Map<Object, RecordLocation> recordLocationMap, int keyValue,
+      IndexSegment segment, int docId, Number comparisonValue, HashFunction hashFunction) {
+    RecordLocation recordLocation =
+        recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), hashFunction));
+    assertNotNull(recordLocation);
+    assertSame(recordLocation.getSegment(), segment);
+    assertEquals(recordLocation.getDocId(), docId);
+    assertEquals(((Number) recordLocation.getComparisonValue()).doubleValue(), comparisonValue.doubleValue());
+  }
+
+  private void verifyPersistAndLoadWatermark() {
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, INDEX_DIR,
+            mock(ServerMetrics.class));
+
+    double currentTimeMs = System.currentTimeMillis();
+    upsertMetadataManager.persistWatermark(currentTimeMs);
+    assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION + 0).exists());
+
+    double watermark = upsertMetadataManager.loadWatermark();
+    assertEquals(watermark, currentTimeMs);
+  }
+
   @Test
   public void testHashPrimaryKey() {
     PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 89f380d049..1f6834f62c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1810,6 +1810,91 @@ public class TableConfigUtilsTest {
     }
   }
 
+  @Test
+  public void testValidateTTLConfigForUpsertConfig() {
+    // Default comparison column (timestamp)
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setMetadataTTL(3600);
+    upsertConfig.setEnableSnapshot(true);
+    TableConfig tableConfigWithoutComparisonColumn =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+            .setUpsertConfig(upsertConfig).build();
+    TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithoutComparisonColumn, schema);
+
+    // Invalid comparison columns: "myCol"
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setComparisonColumns(Collections.singletonList("myCol"));
+    upsertConfig.setEnableSnapshot(true);
+    upsertConfig.setMetadataTTL(3600);
+    TableConfig tableConfigWithInvalidComparisonColumn =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+            .setUpsertConfig(upsertConfig).build();
+    try {
+      TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidComparisonColumn, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Invalid comparison columns: multiple comparison columns are not supported for TTL-enabled upsert table.
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN, "myCol"));
+    upsertConfig.setEnableSnapshot(true);
+    upsertConfig.setMetadataTTL(3600);
+    TableConfig tableConfigWithInvalidComparisonColumn2 =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+            .setUpsertConfig(upsertConfig).build();
+    try {
+      TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidComparisonColumn2, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Invalid config with TTLConfig but Snapshot is not enabled
+    schema =
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setMetadataTTL(3600);
+    upsertConfig.setEnableSnapshot(false);
+    TableConfig tableConfigWithInvalidTTLConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+            .setUpsertConfig(upsertConfig).build();
+    try {
+      TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidTTLConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    // Invalid config with both delete and TTL enabled
+    String delCol = "myDelCol";
+    schema =
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+            .addSingleValueDimension(delCol, FieldSpec.DataType.STRING)
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setMetadataTTL(3600);
+    upsertConfig.setEnableSnapshot(true);
+    upsertConfig.setDeleteRecordColumn(delCol);
+    TableConfig tableConfigWithBothDeleteAndTTL =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+            .setUpsertConfig(upsertConfig).build();
+    try {
+      TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithBothDeleteAndTTL, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+  }
+
   @Test
   public void testValidatePartitionedReplicaGroupInstance() {
     String partitionColumn = "testPartitionCol";
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 38d2e21264..9f2c02fddd 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -26,6 +26,7 @@ public class V1Constants {
   public static final String INDEX_MAP_FILE_NAME = "index_map";
   public static final String INDEX_FILE_NAME = "columns.psf";
   public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME = "validdocids.bitmap.snapshot";
+  public static final String TTL_WATERMARK_TABLE_PARTITION = "ttl.watermark.partition.";
 
   public static class Str {
     public static final char DEFAULT_STRING_PAD_CHAR = '\0';
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
index 58631531f3..05ab86628d 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
@@ -36,6 +36,10 @@ public class ThreadSafeMutableRoaringBitmap {
     _mutableRoaringBitmap.add(firstDocId);
   }
 
+  public ThreadSafeMutableRoaringBitmap(MutableRoaringBitmap mutableRoaringBitmap) {
+    _mutableRoaringBitmap = mutableRoaringBitmap;
+  }
+
   public synchronized void add(int docId) {
     _mutableRoaringBitmap.add(docId);
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index a9981de4ff..2e2a3f645a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -60,6 +60,9 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
   private boolean _enableSnapshot;
 
+  @JsonPropertyDescription("Whether to use TTL for upsert metadata cleanup, it uses the same unit as comparison col")
+  private double _metadataTTL;
+
   @JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery")
   private boolean _enablePreload;
 
@@ -111,6 +114,10 @@ public class UpsertConfig extends BaseJsonConfig {
     return _enableSnapshot;
   }
 
+  public double getMetadataTTL() {
+    return _metadataTTL;
+  }
+
   public boolean isEnablePreload() {
     return _enablePreload;
   }
@@ -179,6 +186,10 @@ public class UpsertConfig extends BaseJsonConfig {
     _enableSnapshot = enableSnapshot;
   }
 
+  public void setMetadataTTL(double metadataTTL) {
+    _metadataTTL = metadataTTL;
+  }
+
   public void setEnablePreload(boolean enablePreload) {
     _enablePreload = enablePreload;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org