You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "deemoliu (via GitHub)" <gi...@apache.org> on 2023/06/15 00:56:20 UTC

[GitHub] [pinot] deemoliu opened a new pull request, #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

deemoliu opened a new pull request, #10915:
URL: https://github.com/apache/pinot/pull/10915

   `feature`: The watermark is the time used to clean up the metadata in the previous round
   
   Added TTLConfig in upsertConfig
   Added API in partitionManager for PrimaryKey cleanup when TTL enabled
   Added Functions to Persist/load Watermark
   Persist/Load Watermark for when cleanup primary keys.
   When restart servers and load Segments, only addRecords that are after watermark.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1262987390


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -367,6 +398,17 @@ public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutable
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
+      // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+      // Note: We only update largestSeenComparisonValue when addRecord, and access the value when addOrReplaceSegments.
+      // We only support single comparison column for TTL-enabled upsert tables.
+      if (_largestSeenComparisonValue > 0) {

Review Comment:
   This code path is invoked during segment reload also, so we need to consider that as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1237416775


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) related configurations.
+ * Pinot upsert keeps track of all primary keys in heap, it's costly and also affects performance when table is large.
+ *
+ * If primary keys in the table have lifecycle, they won't get updated after a certain period time, then we can use the
+ * following configuration to enable upsert ttl feature. Pinot will only keeps track of alive primary keys in heap.

Review Comment:
   gotcha, thanks for review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1269975982


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL feature.
+   * Validation fails when one of the comparison columns is not a numeric value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,

Review Comment:
   gotcha, thanks for the good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on PR #10915:
URL: https://github.com/apache/pinot/pull/10915#issuecomment-1592175052

   cc: @Jackie-Jiang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1237430802


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -395,6 +402,18 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.

Review Comment:
   @chenboat this will be integrated once #10928 merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1237426407


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -243,6 +295,47 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
     }
   }
 
+  protected long loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {

Review Comment:
   @chenboat What if there is exception during load the watermark? 
   we still want to return `Long.MIN_VALUE`, which is same as the case watermarkFile doesn't exist? 
   Because of the above reason i check existence first and return loaded value.
   if not exists or exception happens during try-catch, i return LONG.MIN_VALUE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1237776033


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -395,6 +402,18 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys(Comparable watermark) {
+    if (_upsertTTLConfig.getTtlInMs() > 0) {
+      doRemoveExpiredPrimaryKeys((Long) watermark);

Review Comment:
   Gotcha, thanks for the review!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1237809232


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -55,12 +64,17 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
 
   // Reused for reading previous record during partial upsert
   private final GenericRow _reuse = new GenericRow();
+  private long _lastExpiredTimeMS;

Review Comment:
   this is the watermark value. that keep track of the last time we cleaned up primary keys.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] chenboat commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "chenboat (via GitHub)" <gi...@apache.org>.
chenboat commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1231681071


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -55,12 +64,17 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
 
   // Reused for reading previous record during partial upsert
   private final GenericRow _reuse = new GenericRow();
+  private long _lastExpiredTimeMS;

Review Comment:
   javadoc? what does this variable mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1256897188


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: {}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.info("Deleted watermarkFile for table: {} partition_id: {}", _tableNameWithType, _partitionId);
+          return;
+        }
+      } catch (Exception e) {
+        _logger.warn("Caught exception while deleting watermarkFile for table: {} partition_id: {}", _tableNameWithType,
+            _partitionId);
+      }
+    }
+  }
+
+  protected File getWatermarkFile() {
+    return new File(_tableIndexDir, _tableNameWithType + V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId);

Review Comment:
   Let's not prefix it with table name since it is already under the table dir



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -76,23 +85,34 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
 
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+
+  // Used to maintain the largestSeenComparisonValue to avoid handling out-of-ttl segments/records.
+  // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments.
+  protected volatile double _largestSeenComparisonValue;

Review Comment:
   (minor) Move this after `_numOutOfOrderEvents`. We want to keep variables for the same purpose together



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -60,7 +67,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final String _deleteRecordColumn;
   protected final HashFunction _hashFunction;
   protected final PartialUpsertHandler _partialUpsertHandler;
+  protected final double _upsertTTLInComparisonTimeUnit;

Review Comment:
   (minor) Move `_upsertTTLInComparisonTimeUnit` after `_enableSnapshot` to keep the variables for the same feature together



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -220,6 +258,13 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
     Comparable newComparisonValue = recordInfo.getComparisonValue();
     _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
         (primaryKey, currentRecordLocation) -> {
+          // Update the largestSeenComparisonValueMs when add new record. If records during addSegments has a newer
+          // comparison column values than addRecords, it's a bug and should not happen.
+          if (_upsertTTLInComparisonTimeUnit > 0) {
+            if (recordInfo.getComparisonValue().compareTo(_largestSeenComparisonValue) > 0) {
+              _largestSeenComparisonValue = (double) recordInfo.getComparisonValue();

Review Comment:
   The cast can throw exception. Please add a test with non-double comparison column



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -74,6 +75,16 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
+    // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+    // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments.
+    // We only support single comparison column for TTL-enabled upsert tables.
+    if (_largestSeenComparisonValue > 0) {
+      Comparable endTime =

Review Comment:
   (MAJOR) This won't work if end time is not double type. You want to do `Number.getDouble()` then compare



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {

Review Comment:
   (minor) You may use `DateOutputStream` to directly write `double`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: {}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.info("Deleted watermarkFile for table: {} partition_id: {}", _tableNameWithType, _partitionId);

Review Comment:
   This is incorrect. We should log warning that the file cannot be deleted (same as line 523)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -84,6 +95,13 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
           (primaryKey, currentRecordLocation) -> {
             if (currentRecordLocation != null) {
+              // Skip the records that has comparisonValue timestamp earlier than (largestSeenTimestamp - TTL).

Review Comment:
   I don't follow this logic. Do we need this check?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -60,6 +60,8 @@ public enum Strategy {
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
   private boolean _enableSnapshot;
 
+  private double _upsertTTLInComparisonTimeUnit;

Review Comment:
   This does not necessary to be time though. How about naming it `_metadataTTL` and also add some descriptions



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -76,23 +85,34 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
 
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+
+  // Used to maintain the largestSeenComparisonValue to avoid handling out-of-ttl segments/records.
+  // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments.
+  protected volatile double _largestSeenComparisonValue;
   protected int _numOutOfOrderEvents = 0;
 
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
-      HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      ServerMetrics serverMetrics) {
+      HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler,
+      double upsertTTLInComparisonTimeUnit, boolean enableSnapshot, File tableIndexDir, ServerMetrics serverMetrics) {

Review Comment:
   (minor) Move `upsertTTLInComparisonTimeUnit` after `enableSnapshot`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -74,6 +75,16 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
+    // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+    // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments.
+    // We only support single comparison column for TTL-enabled upsert tables.
+    if (_largestSeenComparisonValue > 0) {

Review Comment:
   Can we move this logic into the base manager?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,

Review Comment:
   (minor)
   ```suggestion
           _logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: {}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {

Review Comment:
   (minor) This try-catch is not needed



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL feature.
+   * Validation fails when one of the comparison columns is not a numeric value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,
+          String.format("Currently upsert TTL only support 1 comparison columns."));
+
+      for (String column : comparisonColumns) {

Review Comment:
   (minor) We can just check `comparisonColumns.get(0)`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL feature.
+   * Validation fails when one of the comparison columns is not a numeric value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,

Review Comment:
   (minor)
   ```suggestion
         Preconditions.checkState(comparisonColumns.size() == 1,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1258696168


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -84,6 +95,13 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
           (primaryKey, currentRecordLocation) -> {
             if (currentRecordLocation != null) {
+              // Skip the records that has comparisonValue timestamp earlier than (largestSeenTimestamp - TTL).

Review Comment:
   @Jackie-Jiang I think this is optional. Do you suggest me to remove it?
   
   For segments out of retention, we can skip them based on segmentMetadata.
   For segments within retention, some records has comparison column value out of retention, this logic will skip these records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1267660646


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -274,6 +309,13 @@ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeM
     addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
   }
 
+  protected void enableSegmentWithSnapshot(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds) {
+    MutableRoaringBitmap validDocIdsSnapshot = segment.loadValidDocIdsFromSnapshot();
+    validDocIdsSnapshot.forEach((int docId) -> validDocIds.add(docId));

Review Comment:
   Add a check `if (validDocIdsSnapshot != null)`, and consider how to handle the case when snapshot doesn't exist. In normal case snapshot should exist, but we don't want NPE here



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -205,6 +206,18 @@ private static void removeDocId(IndexSegment segment, int docId) {
   @Override
   protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
     assert !validDocIds.isEmpty();
+
+    // Skip removing segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).

Review Comment:
   This part should be moved to the `BasePartitionUpsertMetadataManager.removeSegment(ImmutableSegment segment)` before locking the snapshot read lock



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -253,6 +273,21 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
       if (queryableDocIds == null && _deleteRecordColumn != null) {
         queryableDocIds = new ThreadSafeMutableRoaringBitmap();
       }
+      // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).

Review Comment:
   This part should be moved to the `BasePartitionUpsertMetadataManager.addSegment(ImmutableSegment segment)` before locking the snapshot read lock. Here it is already too late and we already performed the expensive work



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL feature.
+   * Validation fails when one of the comparison columns is not a numeric value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,

Review Comment:
   You already have that check on line 614



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1232972361


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -395,6 +402,18 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys(Comparable watermark) {

Review Comment:
   Why do we pass in the watermark value instead of calculating it within the metadata manager? Who is going to manage the watermark?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) related configurations.
+ * Pinot upsert keeps track of all primary keys in heap, it's costly and also affects performance when table is large.
+ *
+ * If primary keys in the table have lifecycle, they won't get updated after a certain period time, then we can use the
+ * following configuration to enable upsert ttl feature. Pinot will only keeps track of alive primary keys in heap.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UpsertTTLConfig {

Review Comment:
   To make it more general, shall we consider using an absolute long (or double) value as the TTL? In order to enable TTL, the comparison column must be a number type (we should also verify that when validating the table config), and we use absolute value to determine whether the key is expired. E.g. the latest comparison value we got is 10000, and TTL is set to 1000, then any comparison value smaller than 9000 are expired, and we put 9000 as the watermark.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -395,6 +402,18 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys(Comparable watermark) {
+    if (_upsertTTLConfig.getTtlInMs() > 0) {
+      doRemoveExpiredPrimaryKeys((Long) watermark);

Review Comment:
   We want to check the `_stopped` flag, and also use `startOperation()` and `finishOperation()` to prevent getting interrupted. You may refer to how other operations are handled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on PR #10915:
URL: https://github.com/apache/pinot/pull/10915#issuecomment-1647305458

   Hi @deemoliu can we also add docs for this in the pinot?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1237845639


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -395,6 +402,18 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys(Comparable watermark) {

Review Comment:
   @Jackie-Jiang good point. if i keep the interface  `removeExpiredPrimaryKeys(Comparable watermark)` i need to call it within the metadata manager to access largestSeenComparisonValue - ttl.
   
   Or i can call it in LLRealtimeDataManager with interface 
   `removeExpiredPrimaryKeys()` since LLRealtimeDataManager do not have access of `largestSeenComparisonValue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1258697995


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL feature.
+   * Validation fails when one of the comparison columns is not a numeric value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,

Review Comment:
   @Jackie-Jiang i think it's possible user doesn't configure any comparison column and use timeColumn as comparison column by default. so i used <= instead of ==



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10915:
URL: https://github.com/apache/pinot/pull/10915#issuecomment-1592201221

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#10915](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (3373984) into [master](https://app.codecov.io/gh/apache/pinot/commit/1d9682fc67aca813d7a14b3a7415f545f9420078?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (1d9682f) will **increase** coverage by `0.00%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #10915     +/-   ##
   =========================================
     Coverage    0.11%    0.11%             
   =========================================
     Files        2186     2135     -51     
     Lines      117492   115157   -2335     
     Branches    17760    17475    -285     
   =========================================
     Hits          137      137             
   + Misses     117335   115000   -2335     
     Partials       20       20             
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1temurin17 | `?` | |
   | integration1temurin20 | `?` | |
   | integration2temurin11 | `?` | |
   | integration2temurin17 | `?` | |
   | integration2temurin20 | `?` | |
   | unittests1temurin11 | `?` | |
   | unittests1temurin17 | `?` | |
   | unittests1temurin20 | `?` | |
   | unittests2temurin11 | `0.11% <0.00%> (-0.01%)` | :arrow_down: |
   | unittests2temurin17 | `0.11% <0.00%> (?)` | |
   | unittests2temurin20 | `0.11% <0.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...cal/upsert/BasePartitionUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQmFzZVBhcnRpdGlvblVwc2VydE1ldGFkYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...t/local/upsert/BaseTableUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQmFzZVRhYmxlVXBzZXJ0TWV0YWRhdGFNYW5hZ2VyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...t/ConcurrentMapPartitionUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQ29uY3VycmVudE1hcFBhcnRpdGlvblVwc2VydE1ldGFkYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...psert/ConcurrentMapTableUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQ29uY3VycmVudE1hcFRhYmxlVXBzZXJ0TWV0YWRhdGFNYW5hZ2VyLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...java/org/apache/pinot/segment/spi/V1Constants.java](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1zcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3NlZ21lbnQvc3BpL1YxQ29uc3RhbnRzLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...rg/apache/pinot/spi/config/table/UpsertConfig.java](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL1Vwc2VydENvbmZpZy5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...apache/pinot/spi/config/table/UpsertTTLConfig.java](https://app.codecov.io/gh/apache/pinot/pull/10915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL1Vwc2VydFRUTENvbmZpZy5qYXZh) | `0.00% <0.00%> (ø)` | |
   
   ... and [72 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/10915/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] chenboat commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "chenboat (via GitHub)" <gi...@apache.org>.
chenboat commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1231674612


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -243,6 +295,47 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
     }
   }
 
+  protected long loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {

Review Comment:
   NIT: code would be much easier to read if it is structured as
   if (!wmf.exist()) {
      return Long.MIN_VALUE;
   } 
   ....
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1237808398


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) related configurations.
+ * Pinot upsert keeps track of all primary keys in heap, it's costly and also affects performance when table is large.
+ *
+ * If primary keys in the table have lifecycle, they won't get updated after a certain period time, then we can use the
+ * following configuration to enable upsert ttl feature. Pinot will only keeps track of alive primary keys in heap.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UpsertTTLConfig {

Review Comment:
   @Jackie-Jiang that's a good point. i have a `getTtlInMs()` in the function, do you mean i should add a variable _ttlInMs and then get rid of  the following
   ```
     private TimeUnit _ttlTimeUnit;
     private long _ttlTimeValue;
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on PR #10915:
URL: https://github.com/apache/pinot/pull/10915#issuecomment-1635074890

   > When removing segment, should we check the watermark and skip removing metadata?
   
   I think so. since we don't have any way to skip when replacing. we can add the check and skip removing metadata.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1239008786


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -55,12 +64,17 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
 
   // Reused for reading previous record during partial upsert
   private final GenericRow _reuse = new GenericRow();
+  private long _lastExpiredTimeMS;

Review Comment:
   updated this value to _largestSeenComparisonValueMs and added more clear definition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1259555374


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -84,6 +95,13 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
           (primaryKey, currentRecordLocation) -> {
             if (currentRecordLocation != null) {
+              // Skip the records that has comparisonValue timestamp earlier than (largestSeenTimestamp - TTL).

Review Comment:
   @Jackie-Jiang i will remove it for this PR for now. Comparing with numbers of segment that out-of-retention, the numbers of out-of-TTL records in valid segments (within TTL) are small. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1262144652


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -247,6 +267,17 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
+      // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).

Review Comment:
   (MAJOR) Just realized a problem. If we return here, the valid doc ids and queryable doc ids won't be set in the segment. The correct solution should be:
   - If `delete` is not enabled, set valid doc ids snapshot into the segment
   - If `delete` is enabled, since we don't have snapshot for queryable doc ids, we need to loop over valid doc ids to create the queryable doc ids (find non-delete records)
   
   Can you add a test to catch this problem? With current solution, after restarting all the invalid docs will show up again



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -556,6 +659,32 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   * Primary keys that has comparison value earlier than largestSeenComparisonValue - TTL value will be removed.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys() {
+    // If upsertTTL is enabled, we will remove expired primary keys from upsertMetadata after taking snapshot.
+    if (_metadataTTL == 0) {

Review Comment:
   To be more robust
   ```suggestion
       if (_metadataTTL <= 0) {
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -367,6 +398,17 @@ public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutable
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
+      // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+      // Note: We only update largestSeenComparisonValue when addRecord, and access the value when addOrReplaceSegments.
+      // We only support single comparison column for TTL-enabled upsert tables.
+      if (_largestSeenComparisonValue > 0) {

Review Comment:
   We cannot skip replacing segment because the new segment doesn't have snapshot, thus we don't know which docs are valid. Unfortunately I don't see a good way to handle it if user is replacing a segment that is out of TTL. Or we only allow user to upload segment without any invalid doc. We need to add more comments explaining the logic.
   Also, we can move this check outside of the segment lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] chenboat commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "chenboat (via GitHub)" <gi...@apache.org>.
chenboat commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1231673523


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) related configurations.
+ * Pinot upsert keeps track of all primary keys in heap, it's costly and also affects performance when table is large.
+ *
+ * If primary keys in the table have lifecycle, they won't get updated after a certain period time, then we can use the
+ * following configuration to enable upsert ttl feature. Pinot will only keeps track of alive primary keys in heap.

Review Comment:
   alive primary keys --> primary keys within ttl window



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] chenboat commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "chenboat (via GitHub)" <gi...@apache.org>.
chenboat commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1231686445


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -395,6 +402,18 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.

Review Comment:
   When will this method be invoked? It seems that it is not used anywhere except in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on PR #10915:
URL: https://github.com/apache/pinot/pull/10915#issuecomment-1615301143

   Addressed @Jackie-Jiang 's comment 
   
   1. used a double comparison column value in upsertConfig to replace upsertTTLConfig so it doesn't need to be timeInMills.
   2. Persist largest seen timestamp which comes from addRecord as watermark per partition. it should be volatile.
   3. Add delete watermark when the upsertConfig updated from ttl enabled to ttl-disabled.
   4. move watermark related logics from BaseTableUpsertMetadataManager to BaseTableUpsertMetadataManager


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1259032378


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -74,6 +75,16 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
+    // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+    // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments.
+    // We only support single comparison column for TTL-enabled upsert tables.
+    if (_largestSeenComparisonValue > 0) {

Review Comment:
   sure. i added this to both addSegment and replaceSegment.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] chenboat commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "chenboat (via GitHub)" <gi...@apache.org>.
chenboat commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1231680171


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -243,6 +295,47 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
     }
   }
 
+  protected long loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        long watermark = ByteBuffer.wrap(bytes).getLong();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while deleting watermark file: {}, skipping",

Review Comment:
   deleting --> loading



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] chenboat commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "chenboat (via GitHub)" <gi...@apache.org>.
chenboat commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1231677527


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -181,6 +210,29 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   * This function will be called before new consuming segment start to consume.
+   *
+   * @param watermark The watermark is the time used to clean up the metadata in the previous round

Review Comment:
   Can you define what is a watermark? Is it about time? what is the unit? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1237428186


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -181,6 +210,29 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   * This function will be called before new consuming segment start to consume.
+   *
+   * @param watermark The watermark is the time used to clean up the metadata in the previous round

Review Comment:
   The watermark is the time used to clean up the metadata in the previous round. 
   it's millisecond unixtimestamp in long value



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -243,6 +295,47 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
     }
   }
 
+  protected long loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        long watermark = ByteBuffer.wrap(bytes).getLong();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while deleting watermark file: {}, skipping",

Review Comment:
   gotcha, thanks for reviewing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1239011520


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertTTLConfig.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+
+/**
+ * The {@code UpsertTTLConfig} class contains the upsert TTL (time-to-live) related configurations.
+ * Pinot upsert keeps track of all primary keys in heap, it's costly and also affects performance when table is large.
+ *
+ * If primary keys in the table have lifecycle, they won't get updated after a certain period time, then we can use the
+ * following configuration to enable upsert ttl feature. Pinot will only keeps track of alive primary keys in heap.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UpsertTTLConfig {

Review Comment:
   @Jackie-Jiang can you please confirm if you want me to make the above changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1259030502


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -74,6 +75,16 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
+    // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+    // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments.
+    // We only support single comparison column for TTL-enabled upsert tables.
+    if (_largestSeenComparisonValue > 0) {
+      Comparable endTime =

Review Comment:
   gotcha, thanks @Jackie-Jiang 
   do you mean Number.doubleValue() ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #10915:
URL: https://github.com/apache/pinot/pull/10915


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on PR #10915:
URL: https://github.com/apache/pinot/pull/10915#issuecomment-1648338611

   > @deemoliu can we also add docs for this in the pinot?
   
   Thanks for asking @KKcorps 
   np, the doc is https://docs.google.com/document/d/1AhkZPkf4St3hj96IXyNs6NOiItxm3LXw3NAgNbG3oRo/edit
   should I add this to OSS wiki?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1260446634


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -367,6 +397,16 @@ public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutable
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
+      // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+      // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments.
+      // We only support single comparison column for TTL-enabled upsert tables.
+      if (_largestSeenComparisonValue > 0) {
+        Number endTime =
+            (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+        if (endTime.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {

Review Comment:
   Add some logs here



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -556,6 +657,28 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   * Primary keys that has comparison value earlier than largestSeenComparisonValue - TTL value will be removed.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys() {
+    if (_stopped) {
+      _logger.debug("Skip removing expired primary keys because metadata manager is already stopped");
+      return;
+    }
+    startOperation();
+    try {
+      if (_metadataTTL > 0) {
+        doRemoveExpiredPrimaryKeys(_largestSeenComparisonValue);

Review Comment:
   (minor) No need to pass in the value since it is accessible to the child class



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -675,6 +675,10 @@ public void run() {
         // Take upsert snapshot before starting consuming events
         if (_partitionUpsertMetadataManager != null) {
           _partitionUpsertMetadataManager.takeSnapshot();
+          // If upsertTTL is enabled, we will remove expired primary keys from upsertMetadata after taking snapshot.
+          if (_tableConfig.getUpsertConfig().getMetadataTTL() > 0) {
+            _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();

Review Comment:
   Similar to `takeSnapshot()`, we can perform the check inside the metadata manager



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -544,6 +584,67 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {
+        DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
+        dataOutputStream.writeDouble(watermark);
+      }

Review Comment:
   ```suggestion
         try (OutputStream outputStream = new FileOutputStream(watermarkFile, false);
           DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
           dataOutputStream.writeDouble(watermark);
         }
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -247,6 +267,16 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
+      // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+      // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments.
+      // We only support single comparison column for TTL-enabled upsert tables.
+      if (_largestSeenComparisonValue > 0) {
+        Number endTime =
+            (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+        if (endTime.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {

Review Comment:
   Add some log here indicating the segment is skipped



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -234,6 +255,14 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
     Comparable newComparisonValue = recordInfo.getComparisonValue();
     _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
         (primaryKey, currentRecordLocation) -> {
+          // Update the largestSeenComparisonValueMs when add new record. If records during addSegments has a newer
+          // comparison column values than addRecords, it's a bug and should not happen.
+          if (_metadataTTL > 0) {
+            Number recordComparisonValue = (Number) recordInfo.getComparisonValue();
+            if (recordComparisonValue.doubleValue() > _largestSeenComparisonValue) {
+              _largestSeenComparisonValue = recordComparisonValue.doubleValue();
+            }

Review Comment:
   (nit)
   ```suggestion
               _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, recordComparisonValue.doubleValue());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -556,6 +657,28 @@ protected void finishOperation() {
     }
   }
 
+  /**
+   * When TTL is enabled for upsert, this function is used to remove expired keys from the primary key indexes.
+   * Primary keys that has comparison value earlier than largestSeenComparisonValue - TTL value will be removed.
+   */
+  @Override
+  public void removeExpiredPrimaryKeys() {
+    if (_stopped) {
+      _logger.debug("Skip removing expired primary keys because metadata manager is already stopped");
+      return;
+    }
+    startOperation();
+    try {
+      if (_metadataTTL > 0) {

Review Comment:
   Similar to `takeSnapshot()`, perform this check as the first step



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java:
##########
@@ -26,6 +26,7 @@ private V1Constants() {
   public static final String INDEX_MAP_FILE_NAME = "index_map";
   public static final String INDEX_FILE_NAME = "columns.psf";
   public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME = "validdocids.bitmap.snapshot";
+  public static final String TTL_WATERMARK_TABLE_PARTITION = ".ttl.watermark.partition.";

Review Comment:
   Remove the leading `.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1262966256


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -367,6 +398,17 @@ public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutable
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
     try {
+      // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+      // Note: We only update largestSeenComparisonValue when addRecord, and access the value when addOrReplaceSegments.
+      // We only support single comparison column for TTL-enabled upsert tables.
+      if (_largestSeenComparisonValue > 0) {

Review Comment:
   thanks @Jackie-Jiang. let me remove this check for now.
   i will add a todo to allow user to skip loading segment without any invalid docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #10915:
URL: https://github.com/apache/pinot/pull/10915#issuecomment-1634810675

   When removing segment, should we check the watermark and skip removing metadata?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org