You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/10/03 22:14:22 UTC

[pinot] branch master updated: [Upsert] persist validDocsIndex snapshot for Pinot upsert optimization (#9062)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 38ac70a9c1 [Upsert] persist validDocsIndex snapshot for Pinot upsert optimization (#9062)
38ac70a9c1 is described below

commit 38ac70a9c17c647dbbad090ca9fd60faebac5418
Author: deemoliu <qi...@uber.com>
AuthorDate: Mon Oct 3 15:14:16 2022 -0700

    [Upsert] persist validDocsIndex snapshot for Pinot upsert optimization (#9062)
---
 ...adataAndDictionaryAggregationPlanMakerTest.java |   2 +-
 .../immutable/ImmutableSegmentImpl.java            |  62 +++++++++
 .../upsert/BasePartitionUpsertMetadataManager.java |  34 ++++-
 .../upsert/BaseTableUpsertMetadataManager.java     |   3 +
 ...oncurrentMapPartitionUpsertMetadataManager.java |   4 +-
 .../ConcurrentMapTableUpsertMetadataManager.java   |   2 +-
 .../pinot/segment/local/upsert/UpsertUtils.java    |  45 +++++--
 .../ImmutableSegmentImplUpsertSnapshotTest.java    | 148 +++++++++++++++++++++
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  58 ++++++--
 .../org/apache/pinot/segment/spi/V1Constants.java  |   1 +
 .../pinot/spi/config/table/UpsertConfig.java       |  11 ++
 11 files changed, 347 insertions(+), 23 deletions(-)

diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index a887d3751b..f6df4e9b4e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -130,7 +130,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
         new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
-            "daysSinceEpoch", HashFunction.NONE, null, serverMetrics), new ThreadSafeMutableRoaringBitmap());
+            "daysSinceEpoch", HashFunction.NONE, null, false, serverMetrics), new ThreadSafeMutableRoaringBitmap());
   }
 
   @AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index c84454a041..de6555ff43 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -19,12 +19,17 @@
 package org.apache.pinot.segment.local.indexsegment.immutable;
 
 import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource;
@@ -34,6 +39,7 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -43,7 +49,10 @@ import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,6 +102,59 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
     _validDocIds = validDocIds;
   }
 
+  @Nullable
+  public MutableRoaringBitmap loadValidDocIdsFromSnapshot() {
+    File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
+    if (validDocIdsSnapshotFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(validDocIdsSnapshotFile);
+        MutableRoaringBitmap validDocIds = new ImmutableRoaringBitmap(ByteBuffer.wrap(bytes)).toMutableRoaringBitmap();
+        LOGGER.info("Loaded valid doc ids for segment: {} with: {} valid docs", getSegmentName(),
+            validDocIds.getCardinality());
+        return validDocIds;
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while loading valid doc ids from snapshot file: {}, ignoring the snapshot",
+            validDocIdsSnapshotFile);
+      }
+    }
+    return null;
+  }
+
+  public void persistValidDocIdsSnapshot(MutableRoaringBitmap validDocIds) {
+    File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
+    try {
+      if (validDocIdsSnapshotFile.exists()) {
+        FileUtils.delete(validDocIdsSnapshotFile);
+      }
+      try (DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(validDocIdsSnapshotFile))) {
+        validDocIds.serialize(dataOutputStream);
+      }
+      LOGGER.info("Persisted valid doc ids for segment: {} with: {} valid docs", getSegmentName(),
+          validDocIds.getCardinality());
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while persisting valid doc ids to snapshot file: {}, skipping",
+          validDocIdsSnapshotFile);
+    }
+  }
+
+  public void deleteValidDocIdsSnapshot() {
+    File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
+    if (validDocIdsSnapshotFile.exists()) {
+      try {
+        FileUtils.delete(validDocIdsSnapshotFile);
+        LOGGER.info("Deleted valid doc ids snapshot for segment: {}", getSegmentName());
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while deleting valid doc ids snapshot file: {}, skipping",
+            validDocIdsSnapshotFile);
+      }
+    }
+  }
+
+  private File getValidDocIdsSnapshotFile() {
+    return new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
+        V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+  }
+
   @Override
   public Dictionary getDictionary(String column) {
     ColumnIndexContainer container = _indexContainerMap.get(column);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 41f6fa9f3c..7bcb33cf19 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -54,6 +54,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final String _comparisonColumn;
   protected final HashFunction _hashFunction;
   protected final PartialUpsertHandler _partialUpsertHandler;
+  protected final boolean _enableSnapshot;
   protected final ServerMetrics _serverMetrics;
   protected final Logger _logger;
 
@@ -65,13 +66,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
-      @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) {
+      @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _primaryKeyColumns = primaryKeyColumns;
     _comparisonColumn = comparisonColumn;
     _hashFunction = hashFunction;
     _partialUpsertHandler = partialUpsertHandler;
+    _enableSnapshot = enableSnapshot;
     _serverMetrics = serverMetrics;
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName());
   }
@@ -83,9 +85,25 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
   @Override
   public void addSegment(ImmutableSegment segment) {
-    addSegment(segment, null, null);
+    Iterator<RecordInfo> recordInfoIterator = null;
+    if (segment instanceof ImmutableSegmentImpl) {
+      if (_enableSnapshot) {
+        MutableRoaringBitmap validDocIds = ((ImmutableSegmentImpl) segment).loadValidDocIdsFromSnapshot();
+        if (validDocIds != null) {
+          recordInfoIterator =
+              UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn, validDocIds);
+        }
+      } else {
+        ((ImmutableSegmentImpl) segment).deleteValidDocIdsSnapshot();
+      }
+    }
+    addSegment(segment, null, recordInfoIterator);
   }
 
+  /**
+   * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in
+   *       validDocIds should always be empty.
+   */
   @VisibleForTesting
   public void addSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable Iterator<RecordInfo> recordInfoIterator) {
@@ -133,6 +151,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     replaceSegment(segment, null, null, oldSegment);
   }
 
+  /**
+   * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in
+   *       validDocIds should always be empty.
+   */
   @VisibleForTesting
   public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
@@ -157,6 +179,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
         if (validDocIds == null) {
           validDocIds = new ThreadSafeMutableRoaringBitmap();
         }
+        // New segment doesn't necessary have the same docs as the old segment.
+        // Even for consuming segment, we might re-order the docs.
+        // As a result, we iterate all docIds of the new segment instead of loading it from old segment's snapshot.
         if (recordInfoIterator == null) {
           recordInfoIterator = UpsertUtils.getRecordInfoIterator(segment, _primaryKeyColumns, _comparisonColumn);
         }
@@ -215,6 +240,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     try {
       MutableRoaringBitmap validDocIds =
           segment.getValidDocIds() != null ? segment.getValidDocIds().getMutableRoaringBitmap() : null;
+
+      if (_enableSnapshot && segment instanceof ImmutableSegmentImpl && validDocIds != null) {
+        ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot(validDocIds);
+      }
+
       if (validDocIds == null || validDocIds.isEmpty()) {
         _logger.info("Skip removing segment without valid docs: {}", segmentName);
         return;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 95666d3ea2..7147341b69 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -38,6 +38,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   protected String _comparisonColumn;
   protected HashFunction _hashFunction;
   protected PartialUpsertHandler _partialUpsertHandler;
+  protected boolean _enableSnapshot;
   protected ServerMetrics _serverMetrics;
 
   @Override
@@ -69,6 +70,8 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
               _comparisonColumn);
     }
 
+    _enableSnapshot = upsertConfig.isEnableSnapshot();
+
     _serverMetrics = serverMetrics;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 125f61fe47..611153fd13 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -58,9 +58,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
 
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
-      @Nullable PartialUpsertHandler partialUpsertHandler, ServerMetrics serverMetrics) {
+      @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
     super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumn, hashFunction, partialUpsertHandler,
-        serverMetrics);
+        enableSnapshot, serverMetrics);
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 9c22316703..3f830bb3a7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,7 +36,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
   public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
         k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
-            _comparisonColumn, _hashFunction, _partialUpsertHandler, _serverMetrics));
+            _comparisonColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index a7ea1b92f7..a1ee7b3266 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -24,6 +24,8 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 @SuppressWarnings("rawtypes")
@@ -32,7 +34,7 @@ public class UpsertUtils {
   }
 
   /**
-   * Returns an iterator of {@link RecordInfo} from the segment.
+   * Returns an iterator of {@link RecordInfo} for all the documents from the segment.
    */
   public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment, List<String> primaryKeyColumns,
       String comparisonColumn) {
@@ -47,18 +49,45 @@ public class UpsertUtils {
 
       @Override
       public RecordInfo next() {
-        PrimaryKey primaryKey = new PrimaryKey(new Object[primaryKeyColumns.size()]);
-        getPrimaryKey(segment, primaryKeyColumns, _docId, primaryKey);
+        return getRecordInfo(segment, primaryKeyColumns, comparisonColumn, _docId++);
+      }
+    };
+  }
+
+  /**
+   * Returns an iterator of {@link RecordInfo} for the valid documents from the segment.
+   */
+  public static Iterator<RecordInfo> getRecordInfoIterator(ImmutableSegment segment, List<String> primaryKeyColumns,
+      String comparisonColumn, MutableRoaringBitmap validDocIds) {
+    return new Iterator<RecordInfo>() {
+      private final PeekableIntIterator _docIdIterator = validDocIds.getIntIterator();
 
-        Object comparisonValue = segment.getValue(_docId, comparisonColumn);
-        if (comparisonValue instanceof byte[]) {
-          comparisonValue = new ByteArray((byte[]) comparisonValue);
-        }
-        return new RecordInfo(primaryKey, _docId++, (Comparable) comparisonValue);
+      @Override
+      public boolean hasNext() {
+        return _docIdIterator.hasNext();
+      }
+
+      @Override
+      public RecordInfo next() {
+        return getRecordInfo(segment, primaryKeyColumns, comparisonColumn, _docIdIterator.next());
       }
     };
   }
 
+  /**
+   * Reads a {@link RecordInfo} from the segment.
+   */
+  public static RecordInfo getRecordInfo(ImmutableSegment segment, List<String> primaryKeyColumns,
+      String comparisonColumn, int docId) {
+    PrimaryKey primaryKey = new PrimaryKey(new Object[primaryKeyColumns.size()]);
+    getPrimaryKey(segment, primaryKeyColumns, docId, primaryKey);
+    Object comparisonValue = segment.getValue(docId, comparisonColumn);
+    if (comparisonValue instanceof byte[]) {
+      comparisonValue = new ByteArray((byte[]) comparisonValue);
+    }
+    return new RecordInfo(primaryKey, docId, (Comparable) comparisonValue);
+  }
+
   /**
    * Reads a primary key from the segment.
    */
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
new file mode 100644
index 0000000000..6c31c3981f
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.immutable;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class ImmutableSegmentImplUpsertSnapshotTest {
+  private static final String AVRO_FILE = "data/test_data-mv.avro";
+  private static final String SCHEMA = "data/testDataMVSchema.json";
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ImmutableSegmentImplTest");
+
+  private SegmentDirectory _segmentDirectory;
+  private SegmentMetadataImpl _segmentMetadata;
+  private PinotConfiguration _configuration;
+  private TableConfig _tableConfig;
+  private Schema _schema;
+
+  private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+  private ImmutableSegmentImpl _immutableSegmentImpl;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    Map<String, Object> props = new HashMap<>();
+    props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString());
+    _configuration = new PinotConfiguration(props);
+
+    _segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(INDEX_DIR.toURI(),
+        new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+
+    URL resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(AVRO_FILE);
+    Assert.assertNotNull(resourceUrl);
+    File avroFile = new File(resourceUrl.getFile());
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setRowTimeValueCheck(false);
+    ingestionConfig.setSegmentTimeValueCheck(false);
+
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableSnapshot(true);
+
+    _tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
+            .setIngestionConfig(ingestionConfig).setUpsertConfig(upsertConfig).build();
+
+    resourceUrl = ImmutableSegmentImpl.class.getClassLoader().getResource(SCHEMA);
+    _schema = Schema.fromFile(new File(resourceUrl.getFile()));
+
+    SegmentGeneratorConfig config =
+        SegmentTestUtils.getSegmentGeneratorConfigWithSchema(avroFile, INDEX_DIR, "testTable", _tableConfig, _schema);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(config);
+    driver.build();
+    _segmentMetadata = Mockito.mock(SegmentMetadataImpl.class);
+    Mockito.when(_segmentMetadata.getColumnMetadataMap()).thenReturn(new HashMap<>());
+    Mockito.when(_segmentMetadata.getIndexDir()).thenReturn(INDEX_DIR);
+    _immutableSegmentImpl = new ImmutableSegmentImpl(_segmentDirectory, _segmentMetadata, new HashMap<>(), null);
+
+    ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
+    _partitionUpsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
+            "daysSinceEpoch", HashFunction.NONE, null, true, serverMetrics);
+
+    _immutableSegmentImpl.enableUpsert(_partitionUpsertMetadataManager, new ThreadSafeMutableRoaringBitmap());
+  }
+
+  @Test
+  public void testPersistValidDocIdsSnapshot() {
+    int[] docIds1 = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
+    MutableRoaringBitmap validDocIds = new MutableRoaringBitmap();
+    validDocIds.add(docIds1);
+
+    _immutableSegmentImpl.persistValidDocIdsSnapshot(validDocIds);
+    assertTrue(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
+        V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists());
+
+    MutableRoaringBitmap bitmap = _immutableSegmentImpl.loadValidDocIdsFromSnapshot();
+    assertEquals(bitmap.toArray(), docIds1);
+
+    _immutableSegmentImpl.deleteValidDocIdsSnapshot();
+    assertFalse(new File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
+        V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME).exists());
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 5a6e45573c..3df1f45caa 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -37,6 +38,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -55,15 +57,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   @Test
   public void testAddReplaceRemoveSegment() {
-    verifyAddReplaceRemoveSegment(HashFunction.NONE);
-    verifyAddReplaceRemoveSegment(HashFunction.MD5);
-    verifyAddReplaceRemoveSegment(HashFunction.MURMUR3);
+    verifyAddReplaceRemoveSegment(HashFunction.NONE, false);
+    verifyAddReplaceRemoveSegment(HashFunction.MD5, false);
+    verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, false);
+    verifyAddReplaceRemoveSegment(HashFunction.NONE, true);
+    verifyAddReplaceRemoveSegment(HashFunction.MD5, true);
+    verifyAddReplaceRemoveSegment(HashFunction.MURMUR3, true);
   }
 
-  private void verifyAddReplaceRemoveSegment(HashFunction hashFunction) {
+  private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot) {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            "timeCol", hashFunction, null, mock(ServerMetrics.class));
+            "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -73,7 +78,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
     ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, primaryKeys1);
-    List<RecordInfo> recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+    List<RecordInfo> recordInfoList1;
+    if (enableSnapshot) {
+      // get recordInfo from validDocIdSnapshot.
+      // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+      int[] docIds1 = new int[]{2, 4, 5};
+      MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+      validDocIdsSnapshot1.add(docIds1);
+      recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps);
+    } else {
+      // get recordInfo by iterating all records.
+      recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+    }
     upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator());
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
     assertEquals(recordLocationMap.size(), 3);
@@ -88,8 +104,20 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     timestamps = new int[]{100, 100, 120, 80, 80};
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     ImmutableSegmentImpl segment2 = mockImmutableSegment(2, validDocIds2, getPrimaryKeyList(numRecords, primaryKeys));
-    upsertMetadataManager.addSegment(segment2, validDocIds2,
-        getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
+    List<RecordInfo> recordInfoList2;
+    if (enableSnapshot) {
+      // get recordInfo from validDocIdSnapshot.
+      // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+      // segment1 snapshot: 1 -> {4, 120}
+      MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+      validDocIdsSnapshot2.add(new int[]{0, 2, 3});
+      recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps);
+    } else {
+      // get recordInfo by iterating all records.
+      recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+    }
+    upsertMetadataManager.addSegment(segment2, validDocIds2, recordInfoList2.iterator());
+
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     assertEquals(recordLocationMap.size(), 4);
@@ -174,6 +202,18 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     return recordInfoList;
   }
 
+  /**
+   * Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
+   */
+  private List<RecordInfo> getRecordInfoList(MutableRoaringBitmap validDocIdsSnapshot, int[] primaryKeys,
+      int[] timestamps) {
+    List<RecordInfo> recordInfoList = new ArrayList<>();
+    Iterator<Integer> validDocIdsIterator = validDocIdsSnapshot.iterator();
+    validDocIdsIterator.forEachRemaining((docId) -> recordInfoList.add(
+        new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId, new IntWrapper(timestamps[docId]))));
+    return recordInfoList;
+  }
+
   private List<PrimaryKey> getPrimaryKeyList(int numRecords, int[] primaryKeys) {
     List<PrimaryKey> primaryKeyList = new ArrayList<>();
     for (int i = 0; i < numRecords; i++) {
@@ -233,7 +273,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private void verifyAddRecord(HashFunction hashFunction) {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            "timeCol", hashFunction, null, mock(ServerMetrics.class));
+            "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 3a99c99346..c9d4213b63 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -25,6 +25,7 @@ public class V1Constants {
   public static final String SEGMENT_CREATION_META = "creation.meta";
   public static final String INDEX_MAP_FILE_NAME = "index_map";
   public static final String INDEX_FILE_NAME = "columns.psf";
+  public static final String VALID_DOC_IDS_SNAPSHOT_FILE_NAME = "validdocids.bitmap.snapshot";
 
   public static class Str {
     public static final char DEFAULT_STRING_PAD_CHAR = '\0';
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 162e6b3030..ae0522f6cd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -54,6 +54,9 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Column for upsert comparison, default to time column")
   private String _comparisonColumn;
 
+  @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
+  private boolean _enableSnapshot;
+
   @JsonPropertyDescription("Custom class for upsert metadata manager")
   private String _metadataManagerClass;
 
@@ -111,6 +114,10 @@ public class UpsertConfig extends BaseJsonConfig {
     return _comparisonColumn;
   }
 
+  public boolean isEnableSnapshot() {
+    return _enableSnapshot;
+  }
+
   @Nullable
   public String getMetadataManagerClass() {
     return _metadataManagerClass;
@@ -153,6 +160,10 @@ public class UpsertConfig extends BaseJsonConfig {
     _comparisonColumn = comparisonColumn;
   }
 
+  public void setEnableSnapshot(boolean enableSnapshot) {
+    _enableSnapshot = enableSnapshot;
+  }
+
   public void setMetadataManagerClass(String metadataManagerClass) {
     _metadataManagerClass = metadataManagerClass;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org