You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/03 06:25:59 UTC
(pinot) branch master updated: Support Upsert deletion for TTL: construct queryableDocIds when adding segments out of TTL (#11791)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf41a0e5c7 Support Upsert deletion for TTL: construct queryableDocIds when adding segments out of TTL (#11791)
cf41a0e5c7 is described below
commit cf41a0e5c76b1f9c33a5fa7a3ac362b52bee968c
Author: deemoliu <qi...@uber.com>
AuthorDate: Thu Nov 2 23:25:53 2023 -0700
Support Upsert deletion for TTL: construct queryableDocIds when adding segments out of TTL (#11791)
---
.../upsert/BasePartitionUpsertMetadataManager.java | 31 +++-
.../segment/local/utils/TableConfigUtils.java | 4 -
...rrentMapPartitionUpsertMetadataManagerTest.java | 206 ++++++++++++++++++++-
.../segment/local/utils/TableConfigUtilsTest.java | 21 ---
4 files changed, 229 insertions(+), 33 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index ea9aa7f582..7d3de0346e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -50,6 +51,8 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BooleanUtils;
+import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,6 +126,28 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
return _primaryKeyColumns;
}
+ @Nullable
+ protected MutableRoaringBitmap getQueryableDocIds(IndexSegment segment, MutableRoaringBitmap validDocIds) {
+ if (_deleteRecordColumn == null) {
+ return null;
+ }
+ MutableRoaringBitmap queryableDocIds = new MutableRoaringBitmap();
+ try (PinotSegmentColumnReader deleteRecordColumnReader = new PinotSegmentColumnReader(segment,
+ _deleteRecordColumn)) {
+ PeekableIntIterator docIdIterator = validDocIds.getIntIterator();
+ while (docIdIterator.hasNext()) {
+ int docId = docIdIterator.next();
+ if (!BooleanUtils.toBoolean(deleteRecordColumnReader.getValue(docId))) {
+ queryableDocIds.add(docId);
+ }
+ }
+ } catch (IOException e) {
+ _logger.error("Failed to close column reader for delete record column: {} for segment: {} ", _deleteRecordColumn,
+ segment.getSegmentName(), e);
+ }
+ return queryableDocIds;
+ }
+
@Override
public void addSegment(ImmutableSegment segment) {
String segmentName = segment.getSegmentName();
@@ -140,15 +165,15 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled");
Preconditions.checkState(_comparisonColumns.size() == 1,
"Upsert TTL does not work with multiple comparison columns");
- // TODO: Support deletion for TTL. Need to construct queryableDocIds when adding segments out of TTL.
- Preconditions.checkState(_deleteRecordColumn == null, "Upsert TTL doesn't work with record deletion");
Number maxComparisonValue =
(Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
_logger.info("Skip adding segment: {} because it's out of TTL", segmentName);
MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot();
if (validDocIdsSnapshot != null) {
- immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), null);
+ MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, validDocIdsSnapshot);
+ immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
+ new ThreadSafeMutableRoaringBitmap(queryableDocIds));
} else {
_logger.warn("Failed to find snapshot from segment: {} which is out of TTL, treating all documents as valid",
segmentName);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 903d8aa1ac..f6d9ea957b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -755,10 +755,6 @@ public final class TableConfigUtils {
}
Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must have snapshot enabled");
-
- // TODO: Support deletion for TTL. Need to construct queryableDocIds when adding segments out of TTL.
- Preconditions.checkState(upsertConfig.getDeleteRecordColumn() == null,
- "Upsert TTL doesn't work with record deletion");
}
/**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 963fae0474..feaa8da304 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -53,6 +54,7 @@ import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.mockito.MockedConstruction;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -62,6 +64,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
@@ -145,7 +148,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
}
@Test
- public void testUpsertMetadataCleanupWithTTLConfig() {
+ public void testUpsertMetadataCleanupWithTTLConfig()
+ throws IOException {
verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
@@ -156,6 +160,44 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
verifyAddSegmentForTTL(new Double(80));
verifyAddSegmentForTTL(new Long(80));
verifyAddOutOfTTLSegment();
+ verifyAddOutOfTTLSegmentWithRecordDelete();
+ }
+
+ @Test
+ public void testGetQueryableDocIds() {
+ boolean[] deleteFlags1 = new boolean[]{false, false, false, true, true, false};
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ MutableRoaringBitmap queryableDocIds1 = new MutableRoaringBitmap();
+ queryableDocIds1.add(new int[]{2, 5});
+ verifyGetQueryableDocIds(false, deleteFlags1, validDocIdsSnapshot1, queryableDocIds1);
+
+ // all records are not deleted
+ boolean[] deleteFlags2 = new boolean[]{false, false, false, false, false, false};
+ int[] docIds2 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(docIds2);
+ MutableRoaringBitmap queryableDocIds2 = new MutableRoaringBitmap();
+ queryableDocIds2.add(docIds2);
+ verifyGetQueryableDocIds(false, deleteFlags2, validDocIdsSnapshot2, queryableDocIds2);
+
+ // delete column has null values
+ boolean[] deleteFlags3 = new boolean[]{false, false, false, false, false, false};
+ int[] docIds3 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot3 = new MutableRoaringBitmap();
+ validDocIdsSnapshot3.add(docIds3);
+ MutableRoaringBitmap queryableDocIds3 = new MutableRoaringBitmap();
+ queryableDocIds3.add(docIds3);
+ verifyGetQueryableDocIds(true, deleteFlags3, validDocIdsSnapshot3, queryableDocIds3);
+
+ // All records are deleted record.
+ boolean[] deleteFlags4 = new boolean[]{true, true, true, true, true, true};
+ int[] docIds4 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot4 = new MutableRoaringBitmap();
+ validDocIdsSnapshot4.add(docIds4);
+ MutableRoaringBitmap queryableDocIds4 = new MutableRoaringBitmap();
+ verifyGetQueryableDocIds(false, deleteFlags4, validDocIdsSnapshot4, queryableDocIds4);
}
private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
@@ -563,6 +605,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
return segment;
}
+ private static ImmutableSegmentImpl mockImmutableSegmentWithSegmentMetadata(int sequenceNumber,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, SegmentMetadataImpl segmentMetadata, MutableRoaringBitmap snapshot) {
+ ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber, validDocIds, queryableDocIds, primaryKeys);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
+ return segment;
+ }
+
private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
@@ -923,7 +974,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
- private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue) {
+ private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue)
+ throws IOException {
File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -982,9 +1034,16 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
// ValidDocIds for out-of-ttl records should not be removed.
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
- private void verifyAddOutOfTTLSegment() {
+ private void verifyAddOutOfTTLSegment()
+ throws IOException {
File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -1048,9 +1107,133 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.addSegment(segment2);
// out of ttl segment should not be added to recordLocationMap
assertEquals(recordLocationMap.size(), 5);
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
- private void verifyAddSegmentForTTL(Comparable comparisonValue) {
+ private void verifyAddOutOfTTLSegmentWithRecordDelete()
+ throws IOException {
+ String comparisonColumn = "timeCol";
+ String deleteRecordColumn = "deleteCol";
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30,
+ INDEX_DIR, mock(ServerMetrics.class));
+ Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+ // Add the first segment, it will not be skipped
+ int numRecords = 6;
+ int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+ int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ boolean[] deleteFlags = new boolean[]{false, false, false, true, true, false};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegmentWithEndTime(1, validDocIds1, queryableDocIds1, primaryKeys1,
+ Collections.singletonList(comparisonColumn), new Double(120), validDocIdsSnapshot1);
+
+ // get recordInfo from validDocIdSnapshot.
+ // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ List<RecordInfo> recordInfoList1;
+ recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps, deleteFlags);
+
+ upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, recordInfoList1.iterator());
+ trackedSegments.add(segment1);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 5});
+
+ // Add the second segment, it will be skipped.
+ numRecords = 5;
+ primaryKeys = new int[]{0, 1, 2, 3, 4};
+ timestamps = new int[]{40, 40, 40, 40, 40};
+ deleteFlags = new boolean[]{false, false, true, false, true};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+
+ int[] docIds2 = new int[]{3, 4};
+ validDocIdsSnapshot2.add(docIds2);
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegmentWithEndTime(2, validDocIds2, queryableDocIds2, getPrimaryKeyList(numRecords, primaryKeys),
+ Collections.singletonList(comparisonColumn), new Double(40), validDocIdsSnapshot2);
+
+ // get recordInfo from validDocIdSnapshot.
+ // segment2 snapshot: 3 -> {3, 40}, 4 -> {4, 40}
+ // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ List<RecordInfo> recordInfoList2;
+ recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps, deleteFlags);
+
+ upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2, recordInfoList2.iterator());
+ trackedSegments.add(segment2);
+
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ // segment2: 3 -> {3, 40}, 4 -> {4, 40}
+ assertEquals(recordLocationMap.size(), 5);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 40, HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 4, segment2, 4, 40, HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3, 4});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 5});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[] deleteFlags,
+ MutableRoaringBitmap validDocIdsSnapshot, MutableRoaringBitmap queryableDocIds) {
+ String comparisonColumn = "timeCol";
+ String deleteRecordColumn = "deleteCol";
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30,
+ INDEX_DIR, mock(ServerMetrics.class));
+
+ try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class,
+ (mockReader, context) -> {
+ for (int i = 0; i < deleteFlags.length; i++) {
+ when(mockReader.isNull(i)).thenReturn(isDeleteColumnNull);
+ when(mockReader.getValue(i)).thenReturn(deleteFlags[i]);
+ }
+ })) {
+
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+ when(segmentMetadata.getTotalDocs()).thenReturn(deleteFlags.length);
+ when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
+ this.put(comparisonColumn, columnMetadata);
+ }});
+ when(columnMetadata.getMaxValue()).thenReturn(null);
+
+ ImmutableSegmentImpl segment =
+ mockImmutableSegmentWithSegmentMetadata(1, new ThreadSafeMutableRoaringBitmap(), null, null, segmentMetadata,
+ validDocIdsSnapshot);
+ assertEquals(upsertMetadataManager.getQueryableDocIds(segment, validDocIdsSnapshot), queryableDocIds);
+ }
+ }
+
+ private void verifyAddSegmentForTTL(Comparable comparisonValue)
+ throws IOException {
File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -1083,6 +1266,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.addSegment(segment1);
assertEquals(recordLocationMap.size(), 1);
checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
// Add the following utils function since the Comparison column is a long value for TTL enabled upsert table.
@@ -1106,7 +1295,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(((Number) recordLocation.getComparisonValue()).doubleValue(), comparisonValue.doubleValue());
}
- private void verifyPersistAndLoadWatermark() {
+ private void verifyPersistAndLoadWatermark()
+ throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 10, INDEX_DIR,
@@ -1118,6 +1308,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
double watermark = upsertMetadataManager.loadWatermark();
assertEquals(watermark, currentTimeMs);
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
}
@Test
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 9655361b4c..1ad4cc74d6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -2054,27 +2054,6 @@ public class TableConfigUtilsTest {
} catch (IllegalStateException e) {
// Expected
}
-
- // Invalid config with both delete and TTL enabled
- String delCol = "myDelCol";
- schema =
- new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
- .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
- .addSingleValueDimension(delCol, FieldSpec.DataType.STRING)
- .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
- upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- upsertConfig.setMetadataTTL(3600);
- upsertConfig.setEnableSnapshot(true);
- upsertConfig.setDeleteRecordColumn(delCol);
- TableConfig tableConfigWithBothDeleteAndTTL =
- new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
- .setUpsertConfig(upsertConfig).build();
- try {
- TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithBothDeleteAndTTL, schema);
- Assert.fail();
- } catch (IllegalStateException e) {
- // Expected
- }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org