You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/24 04:34:28 UTC
[pinot] branch master updated: [Upsert TTL] Add Watermark for each partitions for Primary key cleanup (#10915)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5eafe76ee8 [Upsert TTL] Add Watermark for each partitions for Primary key cleanup (#10915)
5eafe76ee8 is described below
commit 5eafe76ee89adfd12007db17de7a50c5dd7a43ac
Author: deemoliu <qi...@uber.com>
AuthorDate: Sun Jul 23 21:34:21 2023 -0700
[Upsert TTL] Add Watermark for each partitions for Primary key cleanup (#10915)
---
.../realtime/LLRealtimeSegmentDataManager.java | 2 +
...adataAndDictionaryAggregationPlanMakerTest.java | 4 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 143 ++++++++++-
.../upsert/BaseTableUpsertMetadataManager.java | 4 +
...oncurrentMapPartitionUpsertMetadataManager.java | 22 +-
.../ConcurrentMapTableUpsertMetadataManager.java | 4 +-
.../upsert/PartitionUpsertMetadataManager.java | 5 +
.../segment/local/utils/TableConfigUtils.java | 29 +++
...rrentMapPartitionUpsertMetadataManagerTest.java | 279 +++++++++++++++++++--
.../segment/local/utils/TableConfigUtilsTest.java | 85 +++++++
.../org/apache/pinot/segment/spi/V1Constants.java | 1 +
.../mutable/ThreadSafeMutableRoaringBitmap.java | 4 +
.../pinot/spi/config/table/UpsertConfig.java | 11 +
13 files changed, 570 insertions(+), 23 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 3f655728bc..5a2d075cae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -675,6 +675,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Take upsert snapshot before starting consuming events
if (_partitionUpsertMetadataManager != null) {
_partitionUpsertMetadataManager.takeSnapshot();
+ // If upsertTTL is enabled, we will remove expired primary keys from upsertMetadata after taking snapshot.
+ _partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
}
while (!_state.isFinal()) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 47459fb691..874c8a6bed 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -131,8 +131,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
- Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, serverMetrics),
- new ThreadSafeMutableRoaringBitmap(), null);
+ Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, 0, INDEX_DIR,
+ serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
}
@AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 4632047989..6dc9d2bb7f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -20,7 +20,12 @@ package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -32,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -41,6 +47,7 @@ import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -61,6 +68,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final HashFunction _hashFunction;
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
+ protected final double _metadataTTL;
+ protected final File _tableIndexDir;
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
@@ -78,10 +87,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
protected int _numOutOfOrderEvents = 0;
+ // Used to maintain the largestSeenComparisonValue to avoid handling out-of-ttl segments/records.
+ // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to compute expired segments.
+ protected volatile double _largestSeenComparisonValue;
+
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
- ServerMetrics serverMetrics) {
+ double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
@@ -90,9 +103,16 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_hashFunction = hashFunction;
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
+ _metadataTTL = metadataTTL;
+ _tableIndexDir = tableIndexDir;
_snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
_serverMetrics = serverMetrics;
_logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
+ if (metadataTTL > 0) {
+ _largestSeenComparisonValue = loadWatermark();
+ } else {
+ deleteWatermark();
+ }
}
@Override
@@ -114,13 +134,36 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
_tableNameWithType);
+ ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+
+ // Skip adding segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
+ if (_largestSeenComparisonValue > 0) {
+ Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled");
+ Preconditions.checkState(_comparisonColumns.size() == 1,
+ "Upsert TTL does not work with multiple comparison columns");
+ // TODO: Support deletion for TTL. Need to construct queryableDocIds when adding segments out of TTL.
+ Preconditions.checkState(_deleteRecordColumn == null, "Upsert TTL doesn't work with record deletion");
+ Number maxComparisonValue =
+ (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+ if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
+ _logger.info("Skip adding segment: {} because it's out of TTL", segmentName);
+ MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot();
+ if (validDocIdsSnapshot != null) {
+ immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), null);
+ } else {
+ _logger.warn("Failed to find snapshot from segment: {} which is out of TTL, treating all documents as valid",
+ segmentName);
+ }
+ return;
+ }
+ }
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
startOperation();
try {
- doAddSegment((ImmutableSegmentImpl) segment);
+ doAddSegment(immutableSegment);
_trackedSegments.add(segment);
} finally {
finishOperation();
@@ -365,6 +408,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
IndexSegment oldSegment) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
+
+ // Currently when TTL is enabled, we don't support skip loading out-of-TTL segment with snapshots, since we don't
+ // know which docs are valid in the new segment.
+ // TODO: when ttl is enabled, we can allow
+ // (1) skip loading segments without any invalid docs.
+ // (2) assign the invalid docs from the replaced segment to the new segment.
segmentLock.lock();
try {
MutableRoaringBitmap validDocIdsForOldSegment =
@@ -419,6 +468,16 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
return;
}
+ // Skip removing segment that has max comparison value smaller than (largestSeenComparisonValue - TTL)
+ if (_largestSeenComparisonValue > 0) {
+ Number maxComparisonValue =
+ (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+ if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
+ _logger.info("Skip removing segment: {} because it's out of TTL", segmentName);
+ return;
+ }
+ }
+
if (_enableSnapshot) {
_snapshotLock.readLock().lock();
}
@@ -544,6 +603,63 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - startTimeMs);
}
+ /**
+ * Loads watermark from the file if exists.
+ */
+ protected double loadWatermark() {
+ File watermarkFile = getWatermarkFile();
+ if (watermarkFile.exists()) {
+ try {
+ byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+ double watermark = ByteBuffer.wrap(bytes).getDouble();
+ _logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", watermark, _tableNameWithType,
+ _partitionId);
+ return watermark;
+ } catch (Exception e) {
+ _logger.warn("Caught exception while loading watermark file: {}, skipping", watermarkFile);
+ }
+ }
+ return Double.MIN_VALUE;
+ }
+
+ /**
+ * Persists watermark to the file.
+ */
+ protected void persistWatermark(double watermark) {
+ File watermarkFile = getWatermarkFile();
+ try {
+ if (watermarkFile.exists()) {
+ if (!FileUtils.deleteQuietly(watermarkFile)) {
+ _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+ return;
+ }
+ }
+ try (OutputStream outputStream = new FileOutputStream(watermarkFile, false);
+ DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
+ dataOutputStream.writeDouble(watermark);
+ }
+ _logger.info("Persisted watermark: {} to file: {}", watermark, watermarkFile);
+ } catch (Exception e) {
+ _logger.warn("Caught exception while persisting watermark file: {}, skipping", watermarkFile);
+ }
+ }
+
+ /**
+ * Deletes the watermark file.
+ */
+ protected void deleteWatermark() {
+ File watermarkFile = getWatermarkFile();
+ if (watermarkFile.exists()) {
+ if (!FileUtils.deleteQuietly(watermarkFile)) {
+ _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile);
+ }
+ }
+ }
+
+ protected File getWatermarkFile() {
+ return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId);
+ }
+
protected void startOperation() {
_numPendingOperations.getAndIncrement();
}
@@ -556,6 +672,29 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
}
}
+ @Override
+ public void removeExpiredPrimaryKeys() {
+ if (_metadataTTL <= 0) {
+ return;
+ }
+ if (_stopped) {
+ _logger.info("Skip removing expired primary keys because metadata manager is already stopped");
+ return;
+ }
+
+ startOperation();
+ try {
+ doRemoveExpiredPrimaryKeys();
+ } finally {
+ finishOperation();
+ }
+ }
+
+ /**
+ * Removes all primary keys that have comparison value smaller than (largestSeenComparisonValue - TTL).
+ */
+ protected abstract void doRemoveExpiredPrimaryKeys();
+
@Override
public void stop() {
_stopped = true;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index c6434ec487..e23689ad09 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -67,6 +67,8 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
protected HashFunction _hashFunction;
protected PartialUpsertHandler _partialUpsertHandler;
protected boolean _enableSnapshot;
+ protected double _metadataTTL;
+ protected File _tableIndexDir;
protected ServerMetrics _serverMetrics;
private volatile boolean _isPreloading = false;
@@ -104,6 +106,8 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
}
_enableSnapshot = upsertConfig.isEnableSnapshot();
+ _metadataTTL = upsertConfig.getMetadataTTL();
+ _tableIndexDir = tableDataManager.getTableDataDir();
_serverMetrics = serverMetrics;
if (_enableSnapshot && segmentPreloadExecutor != null && upsertConfig.isEnablePreload()) {
// Preloading the segments with snapshots for fast upsert metadata recovery.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index cae8092b2f..3ce09412e3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@@ -59,9 +60,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
- ServerMetrics serverMetrics) {
+ double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction,
- partialUpsertHandler, enableSnapshot, serverMetrics);
+ partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
}
@Override
@@ -205,6 +206,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
@Override
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
assert !validDocIds.isEmpty();
+
PrimaryKey primaryKey = new PrimaryKey(new Object[_primaryKeyColumns.size()]);
PeekableIntIterator iterator = validDocIds.getIntIterator();
try (
@@ -226,6 +228,17 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
}
}
+ @Override
+ public void doRemoveExpiredPrimaryKeys() {
+ double threshold = _largestSeenComparisonValue - _metadataTTL;
+ _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
+ if (((Number) recordLocation.getComparisonValue()).doubleValue() < threshold) {
+ _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation);
+ }
+ });
+ persistWatermark(_largestSeenComparisonValue);
+ }
+
@Override
protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
@@ -234,6 +247,11 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
Comparable newComparisonValue = recordInfo.getComparisonValue();
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(primaryKey, currentRecordLocation) -> {
+ // Update largestSeenComparisonValue when adding new record
+ if (_metadataTTL > 0) {
+ double comparisonValue = ((Number) recordInfo.getComparisonValue()).doubleValue();
+ _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, comparisonValue);
+ }
if (currentRecordLocation != null) {
// Existing primary key
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index b08ea591d1..3380203656 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,8 +36,8 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
- _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot,
- _serverMetrics));
+ _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
+ _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index fbf864bd58..6ebd08c74c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -97,6 +97,11 @@ public interface PartitionUpsertMetadataManager extends Closeable {
*/
void takeSnapshot();
+ /**
+ * Remove the expired primary keys from the metadata when TTL is enabled.
+ */
+ void removeExpiredPrimaryKeys();
+
/**
* Stops the metadata manager. After invoking this method, no access to the metadata will be accepted.
*/
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 76cba334e5..d5741fa0b7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -611,6 +611,35 @@ public final class TableConfigUtils {
}
}
validateAggregateMetricsForUpsertConfig(tableConfig);
+ validateTTLForUpsertConfig(tableConfig, schema);
+ }
+
+ /**
+ * Validates the upsert config related to TTL.
+ */
+ @VisibleForTesting
+ static void validateTTLForUpsertConfig(TableConfig tableConfig, Schema schema) {
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ if (upsertConfig == null || upsertConfig.getMetadataTTL() == 0) {
+ return;
+ }
+
+ List<String> comparisonColumns = upsertConfig.getComparisonColumns();
+ if (CollectionUtils.isNotEmpty(comparisonColumns)) {
+ Preconditions.checkState(comparisonColumns.size() == 1,
+ "Upsert TTL does not work with multiple comparison columns");
+ String comparisonColumn = comparisonColumns.get(0);
+ DataType comparisonColumnDataType = schema.getFieldSpecFor(comparisonColumn).getDataType();
+ Preconditions.checkState(comparisonColumnDataType.isNumeric(),
+ "Upsert TTL must have comparison column: %s in numeric type, found: %s", comparisonColumn,
+ comparisonColumnDataType);
+ }
+
+ Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must have snapshot enabled");
+
+ // TODO: Support deletion for TTL. Need to construct queryableDocIds when adding segments out of TTL.
+ Preconditions.checkState(upsertConfig.getDeleteRecordColumn() == null,
+ "Upsert TTL doesn't work with record deletion");
}
/**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 466f1885d5..fe3907ada3 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.upsert;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -25,15 +26,19 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
@@ -45,7 +50,7 @@ import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
-import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
@@ -56,11 +61,21 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+ private static final File INDEX_DIR =
+ new File(FileUtils.getTempDirectory(), "ConcurrentMapPartitionUpsertMetadataManagerTest");
+
+ @BeforeClass
+ public void setup()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ FileUtils.forceMkdir(INDEX_DIR);
+ }
@Test
public void testAddReplaceRemoveSegment()
@@ -73,12 +88,27 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true);
}
+ @Test
+ public void testUpsertMetadataCleanupWithTTLConfig() {
+ verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
+ verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
+ verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
+ verifyRemoveExpiredPrimaryKeys(new Long(80), new Long(120));
+ verifyPersistAndLoadWatermark();
+ verifyAddSegmentForTTL(new Integer(80));
+ verifyAddSegmentForTTL(new Float(80));
+ verifyAddSegmentForTTL(new Double(80));
+ verifyAddSegmentForTTL(new Long(80));
+ verifyAddOutOfTTLSegment();
+ }
+
private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
throws IOException {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+ mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -240,7 +270,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false,
+ Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -309,7 +339,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
- Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
// Add an empty segment
@@ -324,7 +354,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
- Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
// Replace (reload) the first segment
@@ -347,9 +377,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
- Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Remove the original segment1
upsertMetadataManager.removeSegment(segment1);
@@ -363,9 +393,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
- Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Remove the empty segment
upsertMetadataManager.removeSegment(emptySegment);
@@ -379,7 +409,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Remove segment2
upsertMetadataManager.removeSegment(segment2);
@@ -391,7 +421,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(trackedSegments, Collections.singleton(newSegment1));
assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Stop the metadata manager
upsertMetadataManager.stop();
@@ -403,7 +433,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
assertEquals(trackedSegments, Collections.singleton(newSegment1));
- Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
// Close the metadata manager
upsertMetadataManager.close();
@@ -458,6 +488,25 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
return segment;
}
+ private static ImmutableSegmentImpl mockImmutableSegmentWithEndTime(int sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, List<String> comparisonColumns, Comparable endTime, MutableRoaringBitmap snapshot) {
+ ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber, validDocIds, queryableDocIds, primaryKeys);
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+ when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
+ this.put(comparisonColumns.get(0), columnMetadata);
+ }});
+ when(columnMetadata.getMaxValue()).thenReturn(endTime);
+ if (snapshot != null) {
+ when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
+ } else {
+ when(segment.loadValidDocIdsFromSnapshot()).thenReturn(validDocIds.getMutableRoaringBitmap());
+ }
+ return segment;
+ }
+
private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
@@ -504,7 +553,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+ mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -594,7 +644,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
+ mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -642,13 +693,14 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
verifyAddRecordWithDeleteColumn(HashFunction.MD5);
verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
}
+
private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
throws IOException {
String comparisonColumn = "timeCol";
String deleteColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false,
+ Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -744,6 +796,203 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
+ private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue) {
+ File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir,
+ mock(ServerMetrics.class));
+ Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp: earlierComparisonValue
+ ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+ upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, earlierComparisonValue, false));
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+ // Add a segment with segmentEndTime = earlierComparisonValue, so it will not be skipped
+ int numRecords = 4;
+ int[] primaryKeys = new int[]{0, 1, 2, 3};
+ Number[] timestamps = new Number[]{100, 100, 120, 80};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"),
+ earlierComparisonValue, null);
+
+ int[] docIds1 = new int[]{0, 1, 2, 3};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+
+ // load segment1.
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator());
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp: largerComparisonValue
+ upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false));
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120, HashFunction.NONE);
+
+ // records before (largest seen timestamp - TTL) are expired and removed from upsertMetadata.
+ upsertMetadataManager.removeExpiredPrimaryKeys();
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120, HashFunction.NONE);
+
+ // ValidDocIds for out-of-ttl records should not be removed.
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3});
+ }
+
+ private void verifyAddOutOfTTLSegment() {
+ File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
+ mock(ServerMetrics.class));
+ Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp: 80
+ ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+ upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, new Double(80), false));
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+ // Add a segment with segmentEndTime = 80, so it will not be skipped
+ int numRecords = 4;
+ int[] primaryKeys = new int[]{0, 1, 2, 3};
+ Number[] timestamps = new Number[]{100, 100, 120, 80};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"),
+ new Double(80), null);
+
+ int[] docIds1 = new int[]{0, 1, 2, 3};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+
+ // load segment1 with segmentEndTime: 80, largest seen timestamp: 80. the segment will be loaded.
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator());
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment1, 0, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3});
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp: 120
+ upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(0), 0, new Double(120), false));
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{1, 2, 3});
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocationForTTL(recordLocationMap, 0, segment0, 0, 120, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+ // Add an out-of-ttl segment, verify all the invalid docs should not show up again.
+ // Add a segment with segmentEndTime: 80, largest seen timestamp: 120. the segment will be skipped.
+ List<PrimaryKey> primaryKeys2 = getPrimaryKeyList(numRecords, new int[]{100, 101, 102, 103});
+ int[] docIds2 = new int[]{0, 1};
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(docIds2);
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithEndTime(1, validDocIds2, null, primaryKeys2, Collections.singletonList("timeCol"),
+ new Double(80), validDocIdsSnapshot2);
+ upsertMetadataManager.addSegment(segment2);
+ // out of ttl segment should not be added to recordLocationMap
+ assertEquals(recordLocationMap.size(), 5);
+ }
+
+ private void verifyAddSegmentForTTL(Comparable comparisonValue) {
+ File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
+
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
+ mock(ServerMetrics.class));
+ Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
+ upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // Add record to update largestSeenTimestamp, largest seen timestamp: comparisonValue
+ ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+ upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, comparisonValue, false));
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+ // add a segment with segmentEndTime = -1 so it will be skipped since it out-of-TTL
+ int numRecords = 4;
+ int[] primaryKeys = new int[]{0, 1, 2, 3};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, Collections.singletonList("timeCol"), -1,
+ null);
+
+ int[] docIds1 = new int[]{0, 1, 2, 3};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+
+ // load segment1.
+ upsertMetadataManager.addSegment(segment1);
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+ }
+
+ // Add the following utils function since the Comparison column is a long value for TTL enabled upsert table.
+ private List<RecordInfo> getRecordInfoListForTTL(int numRecords, int[] primaryKeys, Number[] timestamps) {
+ List<RecordInfo> recordInfoList = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordInfoList.add(
+ new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new Double(timestamps[i].doubleValue()), false));
+ }
+ return recordInfoList;
+ }
+
+ // Add the following utils function since the Comparison column is a long value for TTL enabled upsert table.
+ private static void checkRecordLocationForTTL(Map<Object, RecordLocation> recordLocationMap, int keyValue,
+ IndexSegment segment, int docId, Number comparisonValue, HashFunction hashFunction) {
+ RecordLocation recordLocation =
+ recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), hashFunction));
+ assertNotNull(recordLocation);
+ assertSame(recordLocation.getSegment(), segment);
+ assertEquals(recordLocation.getDocId(), docId);
+ assertEquals(((Number) recordLocation.getComparisonValue()).doubleValue(), comparisonValue.doubleValue());
+ }
+
+ private void verifyPersistAndLoadWatermark() {
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, INDEX_DIR,
+ mock(ServerMetrics.class));
+
+ double currentTimeMs = System.currentTimeMillis();
+ upsertMetadataManager.persistWatermark(currentTimeMs);
+ assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION + 0).exists());
+
+ double watermark = upsertMetadataManager.loadWatermark();
+ assertEquals(watermark, currentTimeMs);
+ }
+
@Test
public void testHashPrimaryKey() {
PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 89f380d049..1f6834f62c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1810,6 +1810,91 @@ public class TableConfigUtilsTest {
}
}
+ @Test
+ public void testValidateTTLConfigForUpsertConfig() {
+ // Default comparison column (timestamp)
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setMetadataTTL(3600);
+ upsertConfig.setEnableSnapshot(true);
+ TableConfig tableConfigWithoutComparisonColumn =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithoutComparisonColumn, schema);
+
+ // Invalid comparison columns: "myCol"
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setComparisonColumns(Collections.singletonList("myCol"));
+ upsertConfig.setEnableSnapshot(true);
+ upsertConfig.setMetadataTTL(3600);
+ TableConfig tableConfigWithInvalidComparisonColumn =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ try {
+ TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidComparisonColumn, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+
+ // Invalid comparison columns: multiple comparison columns are not supported for TTL-enabled upsert table.
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN, "myCol"));
+ upsertConfig.setEnableSnapshot(true);
+ upsertConfig.setMetadataTTL(3600);
+ TableConfig tableConfigWithInvalidComparisonColumn2 =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ try {
+ TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidComparisonColumn2, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+
+ // Invalid config with TTLConfig but Snapshot is not enabled
+ schema =
+ new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setMetadataTTL(3600);
+ upsertConfig.setEnableSnapshot(false);
+ TableConfig tableConfigWithInvalidTTLConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ try {
+ TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithInvalidTTLConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+
+ // Invalid config with both delete and TTL enabled
+ String delCol = "myDelCol";
+ schema =
+ new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .addSingleValueDimension(delCol, FieldSpec.DataType.STRING)
+ .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setMetadataTTL(3600);
+ upsertConfig.setEnableSnapshot(true);
+ upsertConfig.setDeleteRecordColumn(delCol);
+ TableConfig tableConfigWithBothDeleteAndTTL =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setUpsertConfig(upsertConfig).build();
+ try {
+ TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithBothDeleteAndTTL, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+ }
+
@Test
public void testValidatePartitionedReplicaGroupInstance() {
String partitionColumn = "testPartitionCol";
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 38d2e21264..9f2c02fddd 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -26,6 +26,7 @@ public class V1Constants {
public static final String INDEX_MAP_FILE_NAME = "index_map";
public static final String INDEX_FILE_NAME = "columns.psf";
public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME = "validdocids.bitmap.snapshot";
+ public static final String TTL_WATERMARK_TABLE_PARTITION = "ttl.watermark.partition.";
public static class Str {
public static final char DEFAULT_STRING_PAD_CHAR = '\0';
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
index 58631531f3..05ab86628d 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/ThreadSafeMutableRoaringBitmap.java
@@ -36,6 +36,10 @@ public class ThreadSafeMutableRoaringBitmap {
_mutableRoaringBitmap.add(firstDocId);
}
+ public ThreadSafeMutableRoaringBitmap(MutableRoaringBitmap mutableRoaringBitmap) {
+ _mutableRoaringBitmap = mutableRoaringBitmap;
+ }
+
public synchronized void add(int docId) {
_mutableRoaringBitmap.add(docId);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index a9981de4ff..2e2a3f645a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -60,6 +60,9 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
private boolean _enableSnapshot;
+ @JsonPropertyDescription("Whether to use TTL for upsert metadata cleanup, it uses the same unit as comparison col")
+ private double _metadataTTL;
+
@JsonPropertyDescription("Whether to preload segments for fast upsert metadata recovery")
private boolean _enablePreload;
@@ -111,6 +114,10 @@ public class UpsertConfig extends BaseJsonConfig {
return _enableSnapshot;
}
+ public double getMetadataTTL() {
+ return _metadataTTL;
+ }
+
public boolean isEnablePreload() {
return _enablePreload;
}
@@ -179,6 +186,10 @@ public class UpsertConfig extends BaseJsonConfig {
_enableSnapshot = enableSnapshot;
}
+ public void setMetadataTTL(double metadataTTL) {
+ _metadataTTL = metadataTTL;
+ }
+
public void setEnablePreload(boolean enablePreload) {
_enablePreload = enablePreload;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org