You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/12/05 01:23:07 UTC

Re: [PR] Add support for retention on deleted keys of upsert tables [pinot]

Jackie-Jiang commented on code in PR #12037:
URL: https://github.com/apache/pinot/pull/12037#discussion_r1414717992


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -731,6 +731,21 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema)
             "The delete record column must be a single-valued BOOLEAN column");
       }
 
+      double deletedKeysTTL = upsertConfig.getDeletedKeysTTL();

Review Comment:
   Shall we move this along with `metadataTTL` validation to avoid duplicate code on `comparisonColumns` check?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -231,13 +233,49 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc
 
   @Override
   public void doRemoveExpiredPrimaryKeys() {
-    double threshold = _largestSeenComparisonValue - _metadataTTL;
+    AtomicInteger numDeletedKeys = new AtomicInteger();
+    long startTime = System.currentTimeMillis();
+    double metadataTTLKeysThreshold;
+    if (_metadataTTL > 0) {
+      metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+    } else {
+      metadataTTLKeysThreshold = Double.MIN_VALUE;
+    }
+
+    double deletedKeysThreshold;
+
+    if (_deletedKeysTTL > 0) {
+      deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+    } else {
+      deletedKeysThreshold = Double.MIN_VALUE;
+    }
     _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
-      if (((Number) recordLocation.getComparisonValue()).doubleValue() < threshold) {
+      if (_metadataTTL > 0 && ((Number) recordLocation.getComparisonValue()).doubleValue() < metadataTTLKeysThreshold) {
         _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+        numDeletedKeys.getAndIncrement();
+      } else if (_deletedKeysTTL > 0
+          && ((Number) recordLocation.getComparisonValue()).doubleValue() < deletedKeysThreshold) {
+        ThreadSafeMutableRoaringBitmap currentQueryableDocIds = recordLocation.getSegment().getQueryableDocIds();
+        // if key not part of queryable doc id, it means it is deleted
+        if (currentQueryableDocIds != null && !currentQueryableDocIds.contains(recordLocation.getDocId())) {

Review Comment:
   (minor) `currentQueryableDocIds != null` should be redundant



##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java:
##########
@@ -47,7 +47,10 @@ public enum ServerTimer implements AbstractMetrics.Timer {
   SEGMENT_UPLOAD_TIME_MS("milliseconds", false),
 
   TOTAL_CPU_TIME_NS("nanoseconds", false, "Total query cost (thread cpu time + system "
-      + "activities cpu time + response serialization cpu time) for query processing on server.");
+      + "activities cpu time + response serialization cpu time) for query processing on server."),
+
+  EXPIRED_PRIMARY_KEYS_DELETION_TIME_MS("milliseconds", false, "Total time taken to delete expired primary keys based "

Review Comment:
   (minor) Suggest renaming it to `UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS` to match the function name.
   
   As a follow up, we can add timer for other operations as well



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -231,13 +233,49 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc
 
   @Override
   public void doRemoveExpiredPrimaryKeys() {
-    double threshold = _largestSeenComparisonValue - _metadataTTL;
+    AtomicInteger numDeletedKeys = new AtomicInteger();
+    long startTime = System.currentTimeMillis();
+    double metadataTTLKeysThreshold;
+    if (_metadataTTL > 0) {
+      metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+    } else {
+      metadataTTLKeysThreshold = Double.MIN_VALUE;
+    }
+
+    double deletedKeysThreshold;
+
+    if (_deletedKeysTTL > 0) {
+      deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+    } else {
+      deletedKeysThreshold = Double.MIN_VALUE;
+    }
     _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
-      if (((Number) recordLocation.getComparisonValue()).doubleValue() < threshold) {
+      if (_metadataTTL > 0 && ((Number) recordLocation.getComparisonValue()).doubleValue() < metadataTTLKeysThreshold) {

Review Comment:
   (minor) Cache `((Number) recordLocation.getComparisonValue()).doubleValue()`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -231,13 +233,49 @@ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDoc
 
   @Override
   public void doRemoveExpiredPrimaryKeys() {
-    double threshold = _largestSeenComparisonValue - _metadataTTL;
+    AtomicInteger numDeletedKeys = new AtomicInteger();
+    long startTime = System.currentTimeMillis();
+    double metadataTTLKeysThreshold;
+    if (_metadataTTL > 0) {
+      metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+    } else {
+      metadataTTLKeysThreshold = Double.MIN_VALUE;
+    }
+
+    double deletedKeysThreshold;
+
+    if (_deletedKeysTTL > 0) {
+      deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+    } else {
+      deletedKeysThreshold = Double.MIN_VALUE;
+    }
     _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
-      if (((Number) recordLocation.getComparisonValue()).doubleValue() < threshold) {
+      if (_metadataTTL > 0 && ((Number) recordLocation.getComparisonValue()).doubleValue() < metadataTTLKeysThreshold) {
         _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+        numDeletedKeys.getAndIncrement();
+      } else if (_deletedKeysTTL > 0
+          && ((Number) recordLocation.getComparisonValue()).doubleValue() < deletedKeysThreshold) {
+        ThreadSafeMutableRoaringBitmap currentQueryableDocIds = recordLocation.getSegment().getQueryableDocIds();
+        // if key not part of queryable doc id, it means it is deleted
+        if (currentQueryableDocIds != null && !currentQueryableDocIds.contains(recordLocation.getDocId())) {
+          _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+          removeDocId(recordLocation.getSegment(), recordLocation.getDocId());
+          numDeletedKeys.getAndIncrement();
+        }
       }
     });
-    persistWatermark(_largestSeenComparisonValue);
+    if (_metadataTTL > 0) {
+      persistWatermark(_largestSeenComparisonValue);
+    }
+    long duration = System.currentTimeMillis() - startTime;
+    int numDeletedTTLKeys = numDeletedKeys.get();
+    if (numDeletedTTLKeys > 0) {
+      _logger.info("Deleted {} primary deleted keys in the table {}", numDeletedTTLKeys, _tableNameWithType);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEY_REMOVED,
+          numDeletedTTLKeys);
+    }
+    _serverMetrics.addTimedTableValue(_tableNameWithType, ServerTimer.EXPIRED_PRIMARY_KEYS_DELETION_TIME_MS, duration,

Review Comment:
   (minor) This can be added to `BasePartitionUpsertMetadataManager` so that other implementation can also record the time



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -157,15 +160,20 @@ public void addSegment(ImmutableSegment segment) {
         "Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
         _tableNameWithType);
     ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+    double maxComparisonValue =

Review Comment:
   (MAJOR) We cannot read value here because there is no guarantee the comparison column is number when TTL is not enabled



##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -48,6 +48,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
   PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
   UPSERT_OUT_OF_ORDER("rows", false),
+  DELETED_KEYS_TTL_PRIMARY_KEY_REMOVED("rows", false),

Review Comment:
   +1 on adding this meter. Shall we keep removed keys from metadata TTL and delete TTL separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org