You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/07/10 01:24:08 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10915: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup

Jackie-Jiang commented on code in PR #10915:
URL: https://github.com/apache/pinot/pull/10915#discussion_r1256897188


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: {}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.info("Deleted watermarkFile for table: {} partition_id: {}", _tableNameWithType, _partitionId);
+          return;
+        }
+      } catch (Exception e) {
+        _logger.warn("Caught exception while deleting watermarkFile for table: {} partition_id: {}", _tableNameWithType,
+            _partitionId);
+      }
+    }
+  }
+
+  protected File getWatermarkFile() {
+    return new File(_tableIndexDir, _tableNameWithType + V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId);

Review Comment:
   Let's not prefix it with table name since it is already under the table dir



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -76,23 +85,34 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
 
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+
+  // Used to maintain the largestSeenComparisonValue to avoid handling out-of-ttl segments/records.
+  // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments.
+  protected volatile double _largestSeenComparisonValue;

Review Comment:
   (minor) Move this after `_numOutOfOrderEvents`. We want to keep variables for the same purpose together



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -60,7 +67,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final String _deleteRecordColumn;
   protected final HashFunction _hashFunction;
   protected final PartialUpsertHandler _partialUpsertHandler;
+  protected final double _upsertTTLInComparisonTimeUnit;

Review Comment:
   (minor) Move `_upsertTTLInComparisonTimeUnit` after `_enableSnapshot` to keep the variables for the same feature together



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -220,6 +258,13 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
     Comparable newComparisonValue = recordInfo.getComparisonValue();
     _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
         (primaryKey, currentRecordLocation) -> {
+          // Update the largestSeenComparisonValueMs when add new record. If records during addSegments has a newer
+          // comparison column values than addRecords, it's a bug and should not happen.
+          if (_upsertTTLInComparisonTimeUnit > 0) {
+            if (recordInfo.getComparisonValue().compareTo(_largestSeenComparisonValue) > 0) {
+              _largestSeenComparisonValue = (double) recordInfo.getComparisonValue();

Review Comment:
   The cast can throw exception. Please add a test with non-double comparison column



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -74,6 +75,16 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
+    // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+    // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments.
+    // We only support single comparison column for TTL-enabled upsert tables.
+    if (_largestSeenComparisonValue > 0) {
+      Comparable endTime =

Review Comment:
   (MAJOR) This won't work if end time is not double type. You want to do `Number.getDouble()` then compare



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {

Review Comment:
   (minor) You may use `DateOutputStream` to directly write `double`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: {}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.info("Deleted watermarkFile for table: {} partition_id: {}", _tableNameWithType, _partitionId);

Review Comment:
   This is incorrect. We should log warning that the file cannot be deleted (same as line 523)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -84,6 +95,13 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
           (primaryKey, currentRecordLocation) -> {
             if (currentRecordLocation != null) {
+              // Skip the records that has comparisonValue timestamp earlier than (largestSeenTimestamp - TTL).

Review Comment:
   I don't follow this logic. Do we need this check?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -60,6 +60,8 @@ public enum Strategy {
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
   private boolean _enableSnapshot;
 
+  private double _upsertTTLInComparisonTimeUnit;

Review Comment:
   This does not necessary to be time though. How about naming it `_metadataTTL` and also add some descriptions



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -76,23 +85,34 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final AtomicInteger _numPendingOperations = new AtomicInteger(1);
 
   protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+
+  // Used to maintain the largestSeenComparisonValue to avoid handling out-of-ttl segments/records.
+  // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments.
+  protected volatile double _largestSeenComparisonValue;
   protected int _numOutOfOrderEvents = 0;
 
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
-      HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      ServerMetrics serverMetrics) {
+      HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler,
+      double upsertTTLInComparisonTimeUnit, boolean enableSnapshot, File tableIndexDir, ServerMetrics serverMetrics) {

Review Comment:
   (minor) Move `upsertTTLInComparisonTimeUnit` after `enableSnapshot`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -74,6 +75,16 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
+    // Skip adding segments that has segment EndTime in the comparison cols earlier than (largestSeenTimestamp - TTL).
+    // Note: We only update largestSeenComparisonValueMs when addRecord, and access the value when addSegments.
+    // We only support single comparison column for TTL-enabled upsert tables.
+    if (_largestSeenComparisonValue > 0) {

Review Comment:
   Can we move this logic into the base manager?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,

Review Comment:
   (minor)
   ```suggestion
           _logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -472,6 +492,72 @@ protected void doTakeSnapshot() {
         numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
   }
 
+  /**
+   * Note: Load watermark when the server is started/restarted.
+   * */
+  protected double loadWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        _logger.info("Loaded watermark {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+            _partitionId);
+        return watermark;
+      } catch (Exception e) {
+        _logger.warn("Caught exception while loading watermark file: {}, skipping",
+            watermarkFile);
+      }
+    }
+    return Double.MIN_VALUE;
+  }
+
+  /**
+   * Note: Persist watermark when the expired primary keys are cleanup from upsertMetadata.
+   * */
+  protected void persistWatermark(double watermark) {
+    File watermarkFile = getWatermarkFile();
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, false)) {
+        long longBits = Double.doubleToLongBits(watermark);
+        outputStream.write(Longs.toByteArray(longBits));
+      }
+      _logger.info("Persisted watermark {} to file for table: {} partition_id: {}", watermark,
+          _tableNameWithType, _partitionId);
+    } catch (Exception e) {
+      _logger.warn("Caught exception while persisting watermark file: {}, skipping",
+          watermarkFile);
+    }
+  }
+
+  /**
+   * Note: Watermark file need to be deleted when upsert TTL is disabled in the upsertConfig.
+   * */
+  protected void deleteWatermark() {
+    File watermarkFile = getWatermarkFile();
+    if (watermarkFile.exists()) {
+      try {

Review Comment:
   (minor) This try-catch is not needed



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL feature.
+   * Validation fails when one of the comparison columns is not a numeric value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,
+          String.format("Currently upsert TTL only support 1 comparison columns."));
+
+      for (String column : comparisonColumns) {

Review Comment:
   (minor) We can just check `comparisonColumns.get(0)`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -595,6 +595,39 @@ && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
+    validateTTLForUpsertConfig(tableConfig, schema);
+  }
+
+  /**
+   * Validates whether the comparison columns is compatible with Upsert TTL feature.
+   * Validation fails when one of the comparison columns is not a numeric value.
+   */
+  @VisibleForTesting
+  static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+    if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE
+        || tableConfig.getUpsertConfig().getUpsertTTLInComparisonTimeUnit() == 0) {
+      return;
+    }
+
+    // comparison columns should hold timestamp values in numeric values
+    List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null && !comparisonColumns.isEmpty()) {
+
+      // currently we only support 1 comparison column since we need to fetch endTime in comparisonValue time unit from
+      // columnMetadata. If we have multiple comparison columns, we can only use the first comparison column as filter.
+      Preconditions.checkState(comparisonColumns.size() <= 1,

Review Comment:
   (minor)
   ```suggestion
         Preconditions.checkState(comparisonColumns.size() == 1,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org