You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/03 06:25:59 UTC

(pinot) branch master updated: Support Upsert deletion for TTL: construct queryableDocIds when adding segments out of TTL (#11791)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cf41a0e5c7 Support Upsert deletion for TTL: construct queryableDocIds when adding segments out of TTL (#11791)
cf41a0e5c7 is described below

commit cf41a0e5c76b1f9c33a5fa7a3ac362b52bee968c
Author: deemoliu <qi...@uber.com>
AuthorDate: Thu Nov 2 23:25:53 2023 -0700

    Support Upsert deletion for TTL: construct queryableDocIds when adding segments out of TTL (#11791)
---
 .../upsert/BasePartitionUpsertMetadataManager.java |  31 +++-
 .../segment/local/utils/TableConfigUtils.java      |   4 -
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 206 ++++++++++++++++++++-
 .../segment/local/utils/TableConfigUtilsTest.java  |  21 ---
 4 files changed, 229 insertions(+), 33 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index ea9aa7f582..7d3de0346e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -50,6 +51,8 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.BooleanUtils;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,6 +126,28 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     return _primaryKeyColumns;
   }
 
+  @Nullable
+  protected MutableRoaringBitmap getQueryableDocIds(IndexSegment segment, MutableRoaringBitmap validDocIds) {
+    if (_deleteRecordColumn == null) {
+      return null;
+    }
+    MutableRoaringBitmap queryableDocIds = new MutableRoaringBitmap();
+    try (PinotSegmentColumnReader deleteRecordColumnReader = new PinotSegmentColumnReader(segment,
+        _deleteRecordColumn)) {
+      PeekableIntIterator docIdIterator = validDocIds.getIntIterator();
+      while (docIdIterator.hasNext()) {
+        int docId = docIdIterator.next();
+        if (!BooleanUtils.toBoolean(deleteRecordColumnReader.getValue(docId))) {
+          queryableDocIds.add(docId);
+        }
+      }
+    } catch (IOException e) {
+      _logger.error("Failed to close column reader for delete record column: {} for segment: {} ", _deleteRecordColumn,
+          segment.getSegmentName(), e);
+    }
+    return queryableDocIds;
+  }
+
   @Override
   public void addSegment(ImmutableSegment segment) {
     String segmentName = segment.getSegmentName();
@@ -140,15 +165,15 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled");
       Preconditions.checkState(_comparisonColumns.size() == 1,
           "Upsert TTL does not work with multiple comparison columns");
-      // TODO: Support deletion for TTL. Need to construct queryableDocIds when adding segments out of TTL.
-      Preconditions.checkState(_deleteRecordColumn == null, "Upsert TTL doesn't work with record deletion");
       Number maxComparisonValue =
           (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
       if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - _metadataTTL) {
         _logger.info("Skip adding segment: {} because it's out of TTL", segmentName);
         MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot();
         if (validDocIdsSnapshot != null) {
-          immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), null);
+          MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, validDocIdsSnapshot);
+          immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
+              new ThreadSafeMutableRoaringBitmap(queryableDocIds));
         } else {
           _logger.warn("Failed to find snapshot from segment: {} which is out of TTL, treating all documents as valid",
               segmentName);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 903d8aa1ac..f6d9ea957b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -755,10 +755,6 @@ public final class TableConfigUtils {
     }
 
     Preconditions.checkState(upsertConfig.isEnableSnapshot(), "Upsert TTL must have snapshot enabled");
-
-    // TODO: Support deletion for TTL. Need to construct queryableDocIds when adding segments out of TTL.
-    Preconditions.checkState(upsertConfig.getDeleteRecordColumn() == null,
-        "Upsert TTL doesn't work with record deletion");
   }
 
   /**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 963fae0474..feaa8da304 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -53,6 +54,7 @@ import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.mockito.MockedConstruction;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -62,6 +64,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
 
@@ -145,7 +148,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   }
 
   @Test
-  public void testUpsertMetadataCleanupWithTTLConfig() {
+  public void testUpsertMetadataCleanupWithTTLConfig()
+      throws IOException {
     verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
     verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
     verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
@@ -156,6 +160,44 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     verifyAddSegmentForTTL(new Double(80));
     verifyAddSegmentForTTL(new Long(80));
     verifyAddOutOfTTLSegment();
+    verifyAddOutOfTTLSegmentWithRecordDelete();
+  }
+
+  @Test
+  public void testGetQueryableDocIds() {
+    boolean[] deleteFlags1 = new boolean[]{false, false, false, true, true, false};
+    int[] docIds1 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+    MutableRoaringBitmap queryableDocIds1 = new MutableRoaringBitmap();
+    queryableDocIds1.add(new int[]{2, 5});
+    verifyGetQueryableDocIds(false, deleteFlags1, validDocIdsSnapshot1, queryableDocIds1);
+
+    // all records are not deleted
+    boolean[] deleteFlags2 = new boolean[]{false, false, false, false, false, false};
+    int[] docIds2 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+    validDocIdsSnapshot2.add(docIds2);
+    MutableRoaringBitmap queryableDocIds2 = new MutableRoaringBitmap();
+    queryableDocIds2.add(docIds2);
+    verifyGetQueryableDocIds(false, deleteFlags2, validDocIdsSnapshot2, queryableDocIds2);
+
+    // delete column has null values
+    boolean[] deleteFlags3 = new boolean[]{false, false, false, false, false, false};
+    int[] docIds3 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot3 = new MutableRoaringBitmap();
+    validDocIdsSnapshot3.add(docIds3);
+    MutableRoaringBitmap queryableDocIds3 = new MutableRoaringBitmap();
+    queryableDocIds3.add(docIds3);
+    verifyGetQueryableDocIds(true, deleteFlags3, validDocIdsSnapshot3, queryableDocIds3);
+
+    // All records are deleted record.
+    boolean[] deleteFlags4 = new boolean[]{true, true, true, true, true, true};
+    int[] docIds4 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot4 = new MutableRoaringBitmap();
+    validDocIdsSnapshot4.add(docIds4);
+    MutableRoaringBitmap queryableDocIds4 = new MutableRoaringBitmap();
+    verifyGetQueryableDocIds(false, deleteFlags4, validDocIdsSnapshot4, queryableDocIds4);
   }
 
   private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
@@ -563,6 +605,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     return segment;
   }
 
+  private static ImmutableSegmentImpl mockImmutableSegmentWithSegmentMetadata(int sequenceNumber,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, SegmentMetadataImpl segmentMetadata, MutableRoaringBitmap snapshot) {
+    ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber, validDocIds, queryableDocIds, primaryKeys);
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
+    return segment;
+  }
+
   private static EmptyIndexSegment mockEmptySegment(int sequenceNumber) {
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     when(segmentMetadata.getName()).thenReturn(getSegmentName(sequenceNumber));
@@ -923,7 +974,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
-  private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue) {
+  private void verifyRemoveExpiredPrimaryKeys(Comparable earlierComparisonValue, Comparable largerComparisonValue)
+      throws IOException {
     File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -982,9 +1034,16 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     // ValidDocIds for out-of-ttl records should not be removed.
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
   }
 
-  private void verifyAddOutOfTTLSegment() {
+  private void verifyAddOutOfTTLSegment()
+      throws IOException {
     File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -1048,9 +1107,133 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.addSegment(segment2);
     // out of ttl segment should not be added to recordLocationMap
     assertEquals(recordLocationMap.size(), 5);
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
   }
 
-  private void verifyAddSegmentForTTL(Comparable comparisonValue) {
+  private void verifyAddOutOfTTLSegmentWithRecordDelete()
+      throws IOException {
+    String comparisonColumn = "timeCol";
+    String deleteRecordColumn = "deleteCol";
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30,
+            INDEX_DIR, mock(ServerMetrics.class));
+    Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    // Add the first segment, it will not be skipped
+    int numRecords = 6;
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+    boolean[] deleteFlags = new boolean[]{false, false, false, true, true, false};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+
+    int[] docIds1 = new int[]{2, 4, 5};
+    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+    validDocIdsSnapshot1.add(docIds1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegmentWithEndTime(1, validDocIds1, queryableDocIds1, primaryKeys1,
+        Collections.singletonList(comparisonColumn), new Double(120), validDocIdsSnapshot1);
+
+    // get recordInfo from validDocIdSnapshot.
+    // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    List<RecordInfo> recordInfoList1;
+    recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps, deleteFlags);
+
+    upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, recordInfoList1.iterator());
+    trackedSegments.add(segment1);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 5});
+
+    // Add the second segment, it will be skipped.
+    numRecords = 5;
+    primaryKeys = new int[]{0, 1, 2, 3, 4};
+    timestamps = new int[]{40, 40, 40, 40, 40};
+    deleteFlags = new boolean[]{false, false, true, false, true};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+
+    int[] docIds2 = new int[]{3, 4};
+    validDocIdsSnapshot2.add(docIds2);
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegmentWithEndTime(2, validDocIds2, queryableDocIds2, getPrimaryKeyList(numRecords, primaryKeys),
+            Collections.singletonList(comparisonColumn), new Double(40), validDocIdsSnapshot2);
+
+    // get recordInfo from validDocIdSnapshot.
+    // segment2 snapshot: 3 -> {3, 40}, 4 -> {4, 40}
+    // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    List<RecordInfo> recordInfoList2;
+    recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps, deleteFlags);
+
+    upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2, recordInfoList2.iterator());
+    trackedSegments.add(segment2);
+
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    // segment2: 3 -> {3, 40}, 4 -> {4, 40}
+    assertEquals(recordLocationMap.size(), 5);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 40, HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 4, segment2, 4, 40, HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3, 4});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 5});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  public void verifyGetQueryableDocIds(boolean isDeleteColumnNull, boolean[] deleteFlags,
+      MutableRoaringBitmap validDocIdsSnapshot, MutableRoaringBitmap queryableDocIds) {
+    String comparisonColumn = "timeCol";
+    String deleteRecordColumn = "deleteCol";
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30,
+            INDEX_DIR, mock(ServerMetrics.class));
+
+    try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class,
+        (mockReader, context) -> {
+          for (int i = 0; i < deleteFlags.length; i++) {
+            when(mockReader.isNull(i)).thenReturn(isDeleteColumnNull);
+            when(mockReader.getValue(i)).thenReturn(deleteFlags[i]);
+          }
+        })) {
+
+      SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+      ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
+      when(segmentMetadata.getTotalDocs()).thenReturn(deleteFlags.length);
+      when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap() {{
+        this.put(comparisonColumn, columnMetadata);
+      }});
+      when(columnMetadata.getMaxValue()).thenReturn(null);
+
+      ImmutableSegmentImpl segment =
+          mockImmutableSegmentWithSegmentMetadata(1, new ThreadSafeMutableRoaringBitmap(), null, null, segmentMetadata,
+              validDocIdsSnapshot);
+      assertEquals(upsertMetadataManager.getQueryableDocIds(segment, validDocIdsSnapshot), queryableDocIds);
+    }
+  }
+
+  private void verifyAddSegmentForTTL(Comparable comparisonValue)
+      throws IOException {
     File tableDir = new File(INDEX_DIR, REALTIME_TABLE_NAME);
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -1083,6 +1266,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.addSegment(segment1);
     assertEquals(recordLocationMap.size(), 1);
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE);
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
   }
 
   // Add the following utils function since the Comparison column is a long value for TTL enabled upsert table.
@@ -1106,7 +1295,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(((Number) recordLocation.getComparisonValue()).doubleValue(), comparisonValue.doubleValue());
   }
 
-  private void verifyPersistAndLoadWatermark() {
+  private void verifyPersistAndLoadWatermark()
+      throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
             Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 10, INDEX_DIR,
@@ -1118,6 +1308,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     double watermark = upsertMetadataManager.loadWatermark();
     assertEquals(watermark, currentTimeMs);
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
   }
 
   @Test
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 9655361b4c..1ad4cc74d6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -2054,27 +2054,6 @@ public class TableConfigUtilsTest {
     } catch (IllegalStateException e) {
       // Expected
     }
-
-    // Invalid config with both delete and TTL enabled
-    String delCol = "myDelCol";
-    schema =
-        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
-            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
-            .addSingleValueDimension(delCol, FieldSpec.DataType.STRING)
-            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
-    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    upsertConfig.setMetadataTTL(3600);
-    upsertConfig.setEnableSnapshot(true);
-    upsertConfig.setDeleteRecordColumn(delCol);
-    TableConfig tableConfigWithBothDeleteAndTTL =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
-            .setUpsertConfig(upsertConfig).build();
-    try {
-      TableConfigUtils.validateTTLForUpsertConfig(tableConfigWithBothDeleteAndTTL, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      // Expected
-    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org