You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2024/02/02 22:03:48 UTC

(pinot) branch master updated: Cover the race condition for upsert compaction (#12346)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ab525ee929 Cover the race condition for upsert compaction (#12346)
ab525ee929 is described below

commit ab525ee9296fec122135fb5f877c45fa37f9c376
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Fri Feb 2 14:03:43 2024 -0800

    Cover the race condition for upsert compaction (#12346)
    
    * Cover the race condition for upsert compaction
    
    The current code can face the race condition when the
    following condition is met:
    
    1. Segment upload is complete for segment replacement
       and segment zk metadata is changed to point the new
       segment.
    2. Server is still loading the segment (e.g. if offheap
       upsert is enabled, segment reload can take a while)
    3. If minion picks up the compaction task at the above
       step, minion will download the new segment (replaced)
       while it will fetch the validDocId bitmap for old
       segment.
    
    To avoid this scenario, we added the extra crc check
    on the minion side. Minion will check if the crc from
    downloaded segment and crc from server api for valid
    doc id bitmap are the same.
    
    Changes from the PR:
    1. Add the new API for fetching validDocId bitmap. the
       response is changed to JSON to include extra
       information like crc.
    2. Wired the new API & added tests
    3. Added the crc check on the minion task executor
    4. Added the crc check on the task generator
    
    * Add the 'validDocIdType' across all APIs
    
    * Changed 'validDocIdType' to 'validDocIdsType'
    
    * Add validDocIdsType enum
    
    * Addressing the comments to use enum everywhere
---
 .../restlet/resources/ValidDocIdMetadataInfo.java  |  15 ++-
 ...ataInfo.java => ValidDocIdsBitmapResponse.java} |  34 +++--
 .../common/restlet/resources/ValidDocIdsType.java  |  39 ++++++
 .../api/resources/PinotTableRestletResource.java   |   6 +-
 .../util/ServerSegmentMetadataReader.java          |  66 +++++++--
 .../pinot/controller/util/TableMetadataReader.java |   7 +-
 .../apache/pinot/core/common/MinionConstants.java  |   5 +
 .../tests/BaseClusterIntegrationTest.java          |   1 +
 .../tests/UpsertTableIntegrationTest.java          |  12 +-
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java |   9 +-
 .../UpsertCompactionTaskExecutor.java              |  32 ++++-
 .../UpsertCompactionTaskGenerator.java             |  41 +++++-
 .../UpsertCompactionTaskGeneratorTest.java         |  30 ++++-
 .../segment/local/utils/TableConfigUtils.java      |  15 +++
 .../segment/local/utils/TableConfigUtilsTest.java  |   6 +-
 .../pinot/server/api/resources/TablesResource.java | 149 ++++++++++++++++-----
 .../pinot/server/api/TablesResourceTest.java       | 136 +++++++++++++++++--
 17 files changed, 512 insertions(+), 91 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
index 475ba91432..ddec1df4db 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
@@ -28,14 +28,19 @@ public class ValidDocIdMetadataInfo {
   private final long _totalValidDocs;
   private final long _totalInvalidDocs;
   private final long _totalDocs;
+  private final String _segmentCrc;
+  private final ValidDocIdsType _validDocIdsType;
 
   public ValidDocIdMetadataInfo(@JsonProperty("segmentName") String segmentName,
       @JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs,
-      @JsonProperty("totalDocs") long totalDocs) {
+      @JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") String segmentCrc,
+      @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType) {
     _segmentName = segmentName;
     _totalValidDocs = totalValidDocs;
     _totalInvalidDocs = totalInvalidDocs;
     _totalDocs = totalDocs;
+    _segmentCrc = segmentCrc;
+    _validDocIdsType = validDocIdsType;
   }
 
   public String getSegmentName() {
@@ -53,4 +58,12 @@ public class ValidDocIdMetadataInfo {
   public long getTotalDocs() {
     return _totalDocs;
   }
+
+  public String getSegmentCrc() {
+    return _segmentCrc;
+  }
+
+  public ValidDocIdsType getValidDocIdsType() {
+    return _validDocIdsType;
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
similarity index 57%
copy from pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
copy to pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
index 475ba91432..9b54774c66 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsBitmapResponse.java
@@ -18,39 +18,37 @@
  */
 package org.apache.pinot.common.restlet.resources;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ValidDocIdMetadataInfo {
+public class ValidDocIdsBitmapResponse {
   private final String _segmentName;
-  private final long _totalValidDocs;
-  private final long _totalInvalidDocs;
-  private final long _totalDocs;
+  private final String _segmentCrc;
+  private final ValidDocIdsType _validDocIdsType;
+  private final byte[] _bitmap;
 
-  public ValidDocIdMetadataInfo(@JsonProperty("segmentName") String segmentName,
-      @JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs,
-      @JsonProperty("totalDocs") long totalDocs) {
+  public ValidDocIdsBitmapResponse(@JsonProperty("segmentName") String segmentName,
+      @JsonProperty("segmentCrc") String crc, @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType,
+      @JsonProperty("bitmap") byte[] bitmap) {
     _segmentName = segmentName;
-    _totalValidDocs = totalValidDocs;
-    _totalInvalidDocs = totalInvalidDocs;
-    _totalDocs = totalDocs;
+    _segmentCrc = crc;
+    _validDocIdsType = validDocIdsType;
+    _bitmap = bitmap;
   }
 
   public String getSegmentName() {
     return _segmentName;
   }
 
-  public long getTotalValidDocs() {
-    return _totalValidDocs;
+  public String getSegmentCrc() {
+    return _segmentCrc;
   }
 
-  public long getTotalInvalidDocs() {
-    return _totalInvalidDocs;
+  public ValidDocIdsType getValidDocIdsType() {
+    return _validDocIdsType;
   }
 
-  public long getTotalDocs() {
-    return _totalDocs;
+  public byte[] getBitmap() {
+    return _bitmap;
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsType.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsType.java
new file mode 100644
index 0000000000..a8fbb129dc
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsType.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+public enum ValidDocIdsType {
+  // Default validDocIds type. This indicates that the validDocIds bitmap is loaded from the snapshot from the
+  // Pinot segment. UpsertConfig's 'enableSnapshot' must be enabled for this type.
+  SNAPSHOT,
+
+  // This indicates that the validDocIds bitmap is loaded from the real-time server's in-memory.
+  //
+  // NOTE: Using in-memory based validDocids bitmap is a bit dangerous as it will not give us the consistency in some
+  // cases (e.g. fetching validDocIds bitmap while the server is restarting & updating validDocIds).
+  IN_MEMORY,
+
+  // This indicates that the validDocIds bitmap is read from the real-time server's in-memory. The valid document ids
+  // here does take account into the deleted records. UpsertConfig's 'deleteRecordColumn' must be provided for this
+  // type.
+  //
+  // NOTE: Using in-memory based validDocids bitmap is a bit dangerous as it will not give us the consistency in some
+  // cases (e.g. fetching validDocIds bitmap while the server is restarting & updating validDocIds).
+  IN_MEMORY_WITH_DELETE;
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index d7894c08f7..4505ff0bc4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -960,7 +960,9 @@ public class PinotTableRestletResource {
       @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
       @ApiParam(value = "A list of segments", allowMultiple = true) @QueryParam("segmentNames")
-      List<String> segmentNames) {
+      List<String> segmentNames,
+      @ApiParam(value = "Valid doc id type", example = "SNAPSHOT|IN_MEMORY|IN_MEMORY_WITH_DELETE")
+      @QueryParam("validDocIdsType") String validDocIdsType) {
     LOGGER.info("Received a request to fetch aggregate valid doc id metadata for a table {}", tableName);
     TableType tableType = Constants.validateTableType(tableTypeStr);
     if (tableType == TableType.OFFLINE) {
@@ -975,7 +977,7 @@ public class PinotTableRestletResource {
       TableMetadataReader tableMetadataReader =
           new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
       JsonNode segmentsMetadataJson =
-          tableMetadataReader.getAggregateValidDocIdMetadata(tableNameWithType, segmentNames,
+          tableMetadataReader.getAggregateValidDocIdMetadata(tableNameWithType, segmentNames, validDocIdsType,
               _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
       validDocIdMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson);
     } catch (InvalidConfigException e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index de1cae193c..fad0559b76 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import javax.annotation.Nullable;
 import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.HttpClientConnectionManager;
@@ -42,6 +43,7 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.glassfish.jersey.client.ClientConfig;
@@ -209,7 +211,7 @@ public class ServerSegmentMetadataReader {
    */
   public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String tableNameWithType,
       Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints,
-      @Nullable List<String> segmentNames, int timeoutMs) {
+      @Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType) {
     List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
     for (Map.Entry<String, List<String>> serverToSegments : serverToSegmentsMap.entrySet()) {
       List<String> segmentsForServer = serverToSegments.getValue();
@@ -224,7 +226,7 @@ public class ServerSegmentMetadataReader {
           }
         }
       }
-      serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType, segmentsToQuery,
+      serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType, segmentsToQuery, validDocIdsType,
           serverToEndpoints.get(serverToSegments.getKey())));
     }
 
@@ -273,10 +275,11 @@ public class ServerSegmentMetadataReader {
    *
    * @return a bitmap of validDocIds
    */
-  public RoaringBitmap getValidDocIdsFromServer(String tableNameWithType, String segmentName, String endpoint,
-      int timeoutMs) {
+  @Deprecated
+  public RoaringBitmap getValidDocIdsFromServer(String tableNameWithType, String segmentName, String validDocIdsType,
+      String endpoint, int timeoutMs) {
     // Build the endpoint url
-    String url = generateValidDocIdsURL(tableNameWithType, segmentName, endpoint);
+    String url = generateValidDocIdsURL(tableNameWithType, segmentName, validDocIdsType, endpoint);
 
     // Set timeout
     ClientConfig clientConfig = new ClientConfig();
@@ -290,6 +293,29 @@ public class ServerSegmentMetadataReader {
     return RoaringBitmapUtils.deserialize(validDocIds);
   }
 
+  /**
+   * This method is called when the API request is to fetch validDocIds for a segment of the given table. This method
+   * will pick a server that hosts the target segment and fetch the validDocIds result.
+   *
+   * @return a bitmap of validDocIds
+   */
+  public ValidDocIdsBitmapResponse getValidDocIdsBitmapFromServer(String tableNameWithType, String segmentName,
+      String endpoint, String validDocIdsType, int timeoutMs) {
+    // Build the endpoint url
+    String url = generateValidDocIdsBitmapURL(tableNameWithType, segmentName, validDocIdsType, endpoint);
+
+    // Set timeout
+    ClientConfig clientConfig = new ClientConfig();
+    clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMs);
+    clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMs);
+
+    ValidDocIdsBitmapResponse response =
+        ClientBuilder.newClient(clientConfig).target(url).request(MediaType.APPLICATION_JSON)
+            .get(ValidDocIdsBitmapResponse.class);
+    Preconditions.checkNotNull(response, "Unable to retrieve validDocIdsBitmap from %s", url);
+    return response;
+  }
+
   private String generateAggregateSegmentMetadataServerURL(String tableNameWithType, List<String> columns,
       String endpoint) {
     tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
@@ -305,15 +331,32 @@ public class ServerSegmentMetadataReader {
     return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, tableNameWithType, segmentName, paramsStr);
   }
 
-  private String generateValidDocIdsURL(String tableNameWithType, String segmentName, String endpoint) {
+  @Deprecated
+  private String generateValidDocIdsURL(String tableNameWithType, String segmentName, String validDocIdsType,
+      String endpoint) {
     tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
     segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
-    return String.format("%s/segments/%s/%s/validDocIds", endpoint, tableNameWithType, segmentName);
+    String url = String.format("%s/segments/%s/%s/validDocIds", endpoint, tableNameWithType, segmentName);
+    if (validDocIdsType != null) {
+      url = url + "?validDocIdsType=" + validDocIdsType;
+    }
+    return url;
   }
 
-  private Pair<String, String> generateValidDocIdMetadataURL(String tableNameWithType, List<String> segmentNames,
+  private String generateValidDocIdsBitmapURL(String tableNameWithType, String segmentName, String validDocIdsType,
       String endpoint) {
     tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
+    segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
+    String url = String.format("%s/segments/%s/%s/validDocIdsBitmap", endpoint, tableNameWithType, segmentName);
+    if (validDocIdsType != null) {
+      url = url + "?validDocIdsType=" + validDocIdsType;
+    }
+    return url;
+  }
+
+  private Pair<String, String> generateValidDocIdMetadataURL(String tableNameWithType, List<String> segmentNames,
+      String validDocIdsType, String endpoint) {
+    tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
     TableSegments tableSegments = new TableSegments(segmentNames);
     String jsonTableSegments;
     try {
@@ -322,8 +365,11 @@ public class ServerSegmentMetadataReader {
       LOGGER.error("Failed to convert segment names to json request body: segmentNames={}", segmentNames);
       throw new RuntimeException(e);
     }
-    return Pair.of(
-        String.format("%s/tables/%s/validDocIdMetadata", endpoint, tableNameWithType), jsonTableSegments);
+    String url = String.format("%s/tables/%s/validDocIdMetadata", endpoint, tableNameWithType);
+    if (validDocIdsType != null) {
+      url = url + "?validDocIdsType=" + validDocIdsType;
+    }
+    return Pair.of(url, jsonTableSegments);
   }
 
   private String generateColumnsParam(List<String> columns) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index f92747b497..25ca6fc7f9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -140,7 +140,7 @@ public class TableMetadataReader {
    */
   public JsonNode getAggregateTableMetadata(String tableNameWithType, List<String> columns, int numReplica,
       int timeoutMs)
-      throws InvalidConfigException, IOException {
+      throws InvalidConfigException {
     final Map<String, List<String>> serverToSegments =
         _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
     BiMap<String, String> endpoints =
@@ -158,7 +158,8 @@ public class TableMetadataReader {
    * This method retrieves the aggregated valid doc id metadata for a given table.
    * @return a list of ValidDocIdMetadataInfo
    */
-  public JsonNode getAggregateValidDocIdMetadata(String tableNameWithType, List<String> segmentNames, int timeoutMs)
+  public JsonNode getAggregateValidDocIdMetadata(String tableNameWithType, List<String> segmentNames,
+      String validDocIdsType, int timeoutMs)
       throws InvalidConfigException {
     final Map<String, List<String>> serverToSegments =
         _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
@@ -169,7 +170,7 @@ public class TableMetadataReader {
 
     List<ValidDocIdMetadataInfo> aggregateTableMetadataInfo =
         serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType, serverToSegments, endpoints,
-            segmentNames, timeoutMs);
+            segmentNames, timeoutMs, validDocIdsType);
     return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 4193984559..a02b2876e3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -164,5 +164,10 @@ public class MinionConstants {
      * e.g. if the count surpasses 100k, then the segment may be compacted
      */
     public static final String INVALID_RECORDS_THRESHOLD_COUNT = "invalidRecordsThresholdCount";
+
+    /**
+     * Valid doc id type
+     */
+    public static final String VALID_DOC_IDS_TYPE = "validDocIdsType";
   }
 }
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 1a4b9691ac..c5d5021cfe 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -405,6 +405,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
 
     if (upsertConfig == null) {
       upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+      upsertConfig.setEnableSnapshot(true);
     }
     if (kafkaTopicName == null) {
       kafkaTopicName = getKafkaTopic();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 3125079f74..342a8b01f3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
@@ -458,11 +459,13 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
       throws Exception {
     final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setDeleteRecordColumn(DELETE_COL);
+    upsertConfig.setEnableSnapshot(true);
     String tableName = "gameScoresWithCompaction";
     TableConfig tableConfig =
         setupTable(tableName, getKafkaTopic() + "-with-compaction", INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
     tableConfig.setTaskConfig(getCompactionTaskConfig());
     updateTableConfig(tableConfig);
+
     waitForAllDocsLoaded(tableName, 600_000L, 1000);
     assertEquals(getScore(tableName), 3692);
     waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
@@ -483,6 +486,7 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
       throws Exception {
     final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setDeleteRecordColumn(DELETE_COL);
+    upsertConfig.setEnableSnapshot(true);
     String tableName = "gameScoresWithCompactionDeleteSegments";
     String kafkaTopicName = getKafkaTopic() + "-with-compaction-segment-delete";
     TableConfig tableConfig = setupTable(tableName, kafkaTopicName, INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
@@ -514,7 +518,13 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
     String tableName = "gameScoresWithCompactionWithSoftDelete";
     String kafkaTopicName = getKafkaTopic() + "-with-compaction-delete";
     TableConfig tableConfig = setupTable(tableName, kafkaTopicName, INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
-    tableConfig.setTaskConfig(getCompactionTaskConfig());
+    TableTaskConfig taskConfig = getCompactionTaskConfig();
+    Map<String, String> compactionTaskConfig =
+        taskConfig.getConfigsForTaskType(MinionConstants.UpsertCompactionTask.TASK_TYPE);
+    compactionTaskConfig.put("validDocIdsType", ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString());
+    taskConfig = new TableTaskConfig(
+        Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE, compactionTaskConfig));
+    tableConfig.setTaskConfig(taskConfig);
     updateTableConfig(tableConfig);
 
     // Push data one more time
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 31f5d6039c..97a572a939 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
@@ -37,7 +38,6 @@ import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,8 +139,8 @@ public class MinionTaskUtils {
     return dirInStr;
   }
 
-  public static RoaringBitmap getValidDocIds(String tableNameWithType, String segmentName, Map<String, String> configs,
-      MinionContext minionContext) {
+  public static ValidDocIdsBitmapResponse getValidDocIdsBitmap(String tableNameWithType, String segmentName,
+      String validDocIdsType, MinionContext minionContext) {
     HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool();
     String clusterName = minionContext.getHelixManager().getClusterName();
 
@@ -151,7 +151,8 @@ public class MinionTaskUtils {
     // We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
     // passing an empty list.
     ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
-    return serverSegmentMetadataReader.getValidDocIdsFromServer(tableNameWithType, segmentName, endpoint, 60_000);
+    return serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint,
+        validDocIdsType, 60_000);
   }
 
   public static String getServer(String segmentName, String tableNameWithType, HelixAdmin helixAdmin,
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index 4c200b9606..cd84b5463f 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -23,6 +23,9 @@ import java.util.Collections;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
@@ -53,7 +56,33 @@ public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExe
 
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     TableConfig tableConfig = getTableConfig(tableNameWithType);
-    RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIds(tableNameWithType, segmentName, configs, MINION_CONTEXT);
+
+    String validDocIdsTypeStr =
+        configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.name());
+    ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
+    ValidDocIdsBitmapResponse validDocIdsBitmapResponse =
+        MinionTaskUtils.getValidDocIdsBitmap(tableNameWithType, segmentName, validDocIdsType.toString(),
+            MINION_CONTEXT);
+
+    // Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
+    // bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
+    // uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
+    // offheap upsert is used because we will need to delete & add all primary keys.
+    // `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
+    // against the crc from the current segment zk metadata, so we don't need to check that here.
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
+    String crcFromDeepStorageSegment = segmentMetadata.getCrc();
+    String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
+    if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
+        || !originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
+      LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from server: {}", segmentName,
+          crcFromDeepStorageSegment, validDocIdsBitmapResponse.getSegmentCrc());
+      return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+          .build();
+    }
+
+    RoaringBitmap validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
 
     if (validDocIds.isEmpty()) {
       // prevents empty segment generation
@@ -68,7 +97,6 @@ public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExe
           .build();
     }
 
-    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
     try (CompactedPinotSegmentRecordReader compactedRecordReader = new CompactedPinotSegmentRecordReader(indexDir,
         validDocIds)) {
       SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 188d90606b..ad005a1ca2 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
 import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
@@ -41,6 +43,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
@@ -49,6 +52,7 @@ import org.slf4j.LoggerFactory;
 
 @TaskGenerator
 public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
   private static final String DEFAULT_BUFFER_PERIOD = "7d";
   private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
@@ -125,9 +129,31 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
 
       // TODO: currently, we put segmentNames=null to get metadata for all segments. We can change this to get
       // valid doc id metadata in batches with the loop.
+
+      // By default, we use 'snapshot' for validDocIdsType. This means that we will use the validDocIds bitmap from
+      // the snapshot from Pinot segment. This will require 'enableSnapshot' from UpsertConfig to be set to true.
+      String validDocIdsTypeStr =
+          taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.toString());
+      ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
+
+      // Validate that the snapshot is enabled if validDocIdsType is validDocIdsSnapshot
+      if (validDocIdsType == ValidDocIdsType.SNAPSHOT) {
+        UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+        Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask");
+        Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format(
+            "'enableSnapshot' from UpsertConfig must be enabled for UpsertCompactionTask with validDocIdsType = %s",
+            validDocIdsType));
+      } else if (validDocIdsType == ValidDocIdsType.IN_MEMORY_WITH_DELETE) {
+        UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+        Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask");
+        Preconditions.checkNotNull(upsertConfig.getDeleteRecordColumn(),
+            String.format("deleteRecordColumn must be provided for " + "UpsertCompactionTask with validDocIdsType = %s",
+                validDocIdsType));
+      }
+
       List<ValidDocIdMetadataInfo> validDocIdMetadataList =
           serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType, serverToSegments,
-              serverToEndpoints, null, 60_000);
+              serverToEndpoints, null, 60_000, validDocIdsType.toString());
 
       Map<String, SegmentZKMetadata> completedSegmentsMap =
           completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity()));
@@ -138,7 +164,9 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
       if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
         pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(),
             "0d");
-        LOGGER.info("Deleted segments containing only invalid records for table: {}", tableNameWithType);
+        LOGGER.info(
+            "Deleted segments containing only invalid records for table: {}, number of segments to be deleted: {}",
+            tableNameWithType, segmentSelectionResult.getSegmentsForDeletion());
       }
 
       int numTasks = 0;
@@ -157,6 +185,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
         configs.put(MinionConstants.DOWNLOAD_URL_KEY, segment.getDownloadUrl());
         configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
         configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc()));
+        configs.put(UpsertCompactionTask.VALID_DOC_IDS_TYPE, validDocIdsType.toString());
         pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs));
         numTasks++;
       }
@@ -179,7 +208,15 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
     for (ValidDocIdMetadataInfo validDocIdMetadata : validDocIdMetadataInfoList) {
       long totalInvalidDocs = validDocIdMetadata.getTotalInvalidDocs();
       String segmentName = validDocIdMetadata.getSegmentName();
+
+      // Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
       SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+      if (segment.getCrc() != Long.parseLong(validDocIdMetadata.getSegmentCrc())) {
+        LOGGER.warn(
+            "CRC mismatch for segment: {}, skipping it for compaction (segmentZKMetadata={}, validDocIdMetadata={})",
+            segmentName, segment.getCrc(), validDocIdMetadata.getSegmentCrc());
+        continue;
+      }
       long totalDocs = validDocIdMetadata.getTotalDocs();
       double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
       if (totalInvalidDocs == totalDocs) {
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 7da9a7f1e8..d30107056b 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
@@ -184,11 +185,11 @@ public class UpsertCompactionTaskGeneratorTest {
   public void testProcessValidDocIdMetadata()
       throws IOException {
     Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
-    List<ValidDocIdMetadataInfo> validDocIdMetadataInfoList = new ArrayList<>();
     String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \""
-        + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + "}," + "{" + "\"totalValidDocs\" : 0,"
-        + "\"totalInvalidDocs\" : 10," + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\","
-        + "\"totalDocs\" : 10" + "}]";
+        + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + ", \"segmentCrc\": \""
+        + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 0," + "\"totalInvalidDocs\" : 10,"
+        + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \""
+        + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 10" + "}]";
     List<ValidDocIdMetadataInfo> validDocIdMetadataInfo =
         JsonUtils.stringToObject(json, new TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
         });
@@ -221,6 +222,27 @@ public class UpsertCompactionTaskGeneratorTest {
             validDocIdMetadataInfo);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
+
+    // Test the case where the completedSegment from api has different crc than segment from zk metadata.
+    json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \""
+        + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + ", \"segmentCrc\": \""
+        + "1234567890" + "\"}," + "{" + "\"totalValidDocs\" : 0," + "\"totalInvalidDocs\" : 10,"
+        + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \""
+        + _completedSegment2.getCrc() + "\","
+        + "\"totalDocs\" : 10" + "}]";
+    validDocIdMetadataInfo = JsonUtils.stringToObject(json, new TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
+    });
+    segmentSelectionResult =
+        UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap,
+            validDocIdMetadataInfo);
+
+    // completedSegment is supposed to be filtered out
+    Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0);
+
+    // completedSegment2 is still supposed to be deleted
+    Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
+        _completedSegment2.getSegmentName());
   }
 
   private Map<String, String> getCompactionConfigs(String invalidRecordsThresholdPercent,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 60b0bf6809..99ca28263c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
@@ -669,6 +670,20 @@ public final class TableConfigUtils {
               taskTypeConfig.containsKey("invalidRecordsThresholdPercent") || taskTypeConfig.containsKey(
                   "invalidRecordsThresholdCount"),
               "invalidRecordsThresholdPercent or invalidRecordsThresholdCount or both must be provided");
+          String validDocIdsType = taskTypeConfig.getOrDefault("validDocIdsType", "snapshot");
+          if (validDocIdsType.equals(ValidDocIdsType.SNAPSHOT.toString())) {
+            UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+            Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask");
+            Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format(
+                "'enableSnapshot' from UpsertConfig must be enabled for UpsertCompactionTask with validDocIdsType = "
+                    + "%s", validDocIdsType));
+          } else if (validDocIdsType.equals(ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString())) {
+            UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+            Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask");
+            Preconditions.checkNotNull(upsertConfig.getDeleteRecordColumn(), String.format(
+                "deleteRecordColumn must be provided for " + "UpsertCompactionTask with validDocIdsType = %s",
+                validDocIdsType));
+          }
         }
       }
     }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 055951dd6f..1c9b52c4cf 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -2300,8 +2300,10 @@ public class TableConfigUtilsTest {
     Map<String, String> upsertCompactionTaskConfig =
         ImmutableMap.of("bufferTimePeriod", "5d", "invalidRecordsThresholdPercent", "1", "invalidRecordsThresholdCount",
             "1");
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setEnableSnapshot(true);
     TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setUpsertConfig(upsertConfig)
         .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
         .build();
 
@@ -2310,7 +2312,7 @@ public class TableConfigUtilsTest {
     // test with invalid invalidRecordsThresholdPercents
     upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "0");
     TableConfig zeroPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setUpsertConfig(upsertConfig)
         .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
         .build();
     Assert.assertThrows(IllegalStateException.class,
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 20bba92ffb..52702c831f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -59,6 +59,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -69,6 +70,8 @@ import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
 import org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.TablesList;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -465,10 +468,65 @@ public class TablesResource {
     }
   }
 
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/segments/{tableNameWithType}/{segmentName}/validDocIdsBitmap")
+  @ApiOperation(value = "Download validDocIds bitmap for an REALTIME immutable segment", notes =
+      "Download validDocIds for " + "an immutable segment in bitmap format.")
+  public ValidDocIdsBitmapResponse downloadValidDocIdsBitmap(
+      @ApiParam(value = "Name of the table with type REALTIME", required = true, example = "myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Valid doc id type", example = "SNAPSHOT|IN_MEMORY|IN_MEMORY_WITH_DELETE")
+      @QueryParam("validDocIdsType") String validDocIdsType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @Context HttpHeaders httpHeaders) {
+    segmentName = URIUtils.decode(segmentName);
+    LOGGER.info("Received a request to download validDocIds for segment {} table {}", segmentName, tableNameWithType);
+    // Validate data access
+    ServerResourceUtils.validateDataAccess(_accessControlFactory, tableNameWithType, httpHeaders);
+
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+    SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+    if (segmentDataManager == null) {
+      throw new WebApplicationException(
+          String.format("Table %s segment %s does not exist", tableNameWithType, segmentName),
+          Response.Status.NOT_FOUND);
+    }
+
+    try {
+      IndexSegment indexSegment = segmentDataManager.getSegment();
+      if (!(indexSegment instanceof ImmutableSegmentImpl)) {
+        throw new WebApplicationException(
+            String.format("Table %s segment %s is not a immutable segment", tableNameWithType, segmentName),
+            Response.Status.BAD_REQUEST);
+      }
+
+      final Pair<ValidDocIdsType, MutableRoaringBitmap> validDocIdsSnapshotPair =
+          getValidDocIds(indexSegment, validDocIdsType);
+      ValidDocIdsType finalValidDocIdsType = validDocIdsSnapshotPair.getLeft();
+      MutableRoaringBitmap validDocIdSnapshot = validDocIdsSnapshotPair.getRight();
+
+      if (validDocIdSnapshot == null) {
+        String msg = String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType,
+            segmentDataManager.getSegmentName());
+        LOGGER.warn(msg);
+        throw new WebApplicationException(msg, Response.Status.NOT_FOUND);
+      }
+
+      byte[] validDocIdsBytes = RoaringBitmapUtils.serialize(validDocIdSnapshot);
+      return new ValidDocIdsBitmapResponse(segmentName, indexSegment.getSegmentMetadata().getCrc(),
+          finalValidDocIdsType, validDocIdsBytes);
+    } finally {
+      tableDataManager.releaseSegment(segmentDataManager);
+    }
+  }
+
   /**
    * Download snapshot for the given immutable segment for upsert table. This endpoint is used when get snapshot from
    * peer to avoid recompute when reload segments.
    */
+  @Deprecated
   @GET
   @Produces(MediaType.APPLICATION_OCTET_STREAM)
   @Path("/segments/{tableNameWithType}/{segmentName}/validDocIds")
@@ -478,7 +536,8 @@ public class TablesResource {
       @ApiParam(value = "Name of the table with type REALTIME", required = true, example = "myTable_REALTIME")
       @PathParam("tableNameWithType") String tableNameWithType,
       @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
-      @Context HttpHeaders httpHeaders) {
+      @ApiParam(value = "Valid doc id type", example = "SNAPSHOT|IN_MEMORY|IN_MEMORY_WITH_DELETE")
+      @QueryParam("validDocIdsType") String validDocIdsType, @Context HttpHeaders httpHeaders) {
     segmentName = URIUtils.decode(segmentName);
     LOGGER.info("Received a request to download validDocIds for segment {} table {}", segmentName, tableNameWithType);
     // Validate data access
@@ -501,18 +560,14 @@ public class TablesResource {
             Response.Status.BAD_REQUEST);
       }
 
-      // Adopt the same logic as the query execution to get the valid doc ids. 'FilterPlanNode.run()'
-      // If the queryableDocId is available (upsert delete is enabled), we read the valid doc ids from it.
-      // Otherwise, we read the valid doc ids.
-      final MutableRoaringBitmap validDocIdSnapshot;
-      if (indexSegment.getQueryableDocIds() != null) {
-        validDocIdSnapshot = indexSegment.getQueryableDocIds().getMutableRoaringBitmap();
-      } else if (indexSegment.getValidDocIds() != null) {
-        validDocIdSnapshot = indexSegment.getValidDocIds().getMutableRoaringBitmap();
-      } else {
-        throw new WebApplicationException(
-            String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType, segmentName),
-            Response.Status.NOT_FOUND);
+      final Pair<ValidDocIdsType, MutableRoaringBitmap> validDocIdSnapshotPair =
+          getValidDocIds(indexSegment, validDocIdsType);
+      MutableRoaringBitmap validDocIdSnapshot = validDocIdSnapshotPair.getRight();
+      if (validDocIdSnapshot == null) {
+        String msg = String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType,
+            segmentDataManager.getSegmentName());
+        LOGGER.warn(msg);
+        throw new WebApplicationException(msg, Response.Status.NOT_FOUND);
       }
 
       byte[] validDocIdsBytes = RoaringBitmapUtils.serialize(validDocIdSnapshot);
@@ -529,16 +584,18 @@ public class TablesResource {
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Provides segment validDocId metadata", notes = "Provides segment validDocId metadata")
   @ApiResponses(value = {
-      @ApiResponse(code = 200, message = "Success"),
-      @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class),
-      @ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class)
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error",
+      response = ErrorInfo.class), @ApiResponse(code = 404, message = "Table or segment not found", response =
+      ErrorInfo.class)
   })
   public String getValidDocIdMetadata(
       @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME")
       @PathParam("tableNameWithType") String tableNameWithType,
-      @ApiParam(value = "Segment name", allowMultiple = true) @QueryParam("segmentNames")
-      List<String> segmentNames) {
-    return ResourceUtils.convertToJsonString(processValidDocIdMetadata(tableNameWithType, segmentNames));
+      @ApiParam(value = "Valid doc id type", example = "SNAPSHOT|IN_MEMORY|IN_MEMORY_WITH_DELETE")
+      @QueryParam("validDocIdsType") String validDocIdsType,
+      @ApiParam(value = "Segment name", allowMultiple = true) @QueryParam("segmentNames") List<String> segmentNames) {
+    return ResourceUtils.convertToJsonString(
+        processValidDocIdMetadata(tableNameWithType, segmentNames, validDocIdsType));
   }
 
   @POST
@@ -546,18 +603,22 @@ public class TablesResource {
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Provides segment validDocId metadata", notes = "Provides segment validDocId metadata")
   @ApiResponses(value = {
-      @ApiResponse(code = 200, message = "Success"),
-      @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class),
-      @ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class)
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error",
+      response = ErrorInfo.class), @ApiResponse(code = 404, message = "Table or segment not found", response =
+      ErrorInfo.class)
   })
   public String getValidDocIdMetadata(
       @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME")
-      @PathParam("tableNameWithType") String tableNameWithType, TableSegments tableSegments) {
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Valid doc id type", example = "SNAPSHOT|IN_MEMORY|IN_MEMORY_WITH_DELETE")
+      @QueryParam("validDocIdsType") String validDocIdsType, TableSegments tableSegments) {
     List<String> segmentNames = tableSegments.getSegments();
-    return ResourceUtils.convertToJsonString(processValidDocIdMetadata(tableNameWithType, segmentNames));
+    return ResourceUtils.convertToJsonString(
+        processValidDocIdMetadata(tableNameWithType, segmentNames, validDocIdsType));
   }
 
-  private List<Map<String, Object>> processValidDocIdMetadata(String tableNameWithType, List<String> segments) {
+  private List<Map<String, Object>> processValidDocIdMetadata(String tableNameWithType, List<String> segments,
+      String validDocIdsType) {
     TableDataManager tableDataManager =
         ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
     List<String> missingSegments = new ArrayList<>();
@@ -588,15 +649,11 @@ public class TablesResource {
           continue;
         }
 
-        // Adopt the same logic as the query execution to get the valid doc ids. 'FilterPlanNode.run()'
-        // If the queryableDocId is available (upsert delete is enabled), we read the valid doc ids from it.
-        // Otherwise, we read the valid doc ids.
-        final MutableRoaringBitmap validDocIdSnapshot;
-        if (indexSegment.getQueryableDocIds() != null) {
-          validDocIdSnapshot = indexSegment.getQueryableDocIds().getMutableRoaringBitmap();
-        } else if (indexSegment.getValidDocIds() != null) {
-          validDocIdSnapshot = indexSegment.getValidDocIds().getMutableRoaringBitmap();
-        } else {
+        final Pair<ValidDocIdsType, MutableRoaringBitmap> validDocIdSnapshotPair =
+            getValidDocIds(indexSegment, validDocIdsType);
+        String finalValidDocIdsType = validDocIdSnapshotPair.getLeft().toString();
+        MutableRoaringBitmap validDocIdSnapshot = validDocIdSnapshotPair.getRight();
+        if (validDocIdSnapshot == null) {
           String msg = String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType,
               segmentDataManager.getSegmentName());
           LOGGER.warn(msg);
@@ -611,6 +668,8 @@ public class TablesResource {
         validDocIdMetadata.put("totalDocs", totalDocs);
         validDocIdMetadata.put("totalValidDocs", totalValidDocs);
         validDocIdMetadata.put("totalInvalidDocs", totalInvalidDocs);
+        validDocIdMetadata.put("segmentCrc", indexSegment.getSegmentMetadata().getCrc());
+        validDocIdMetadata.put("validDocIdsType", finalValidDocIdsType);
         allValidDocIdMetadata.add(validDocIdMetadata);
       } finally {
         tableDataManager.releaseSegment(segmentDataManager);
@@ -619,6 +678,28 @@ public class TablesResource {
     return allValidDocIdMetadata;
   }
 
+  private Pair<ValidDocIdsType, MutableRoaringBitmap> getValidDocIds(IndexSegment indexSegment,
+      String validDocIdsTypeStr) {
+    if (validDocIdsTypeStr == null) {
+      // By default, we read the valid doc ids from snapshot.
+      return Pair.of(ValidDocIdsType.SNAPSHOT, ((ImmutableSegmentImpl) indexSegment).loadValidDocIdsFromSnapshot());
+    }
+    ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
+    switch (validDocIdsType) {
+      case SNAPSHOT:
+        return Pair.of(validDocIdsType, ((ImmutableSegmentImpl) indexSegment).loadValidDocIdsFromSnapshot());
+      case IN_MEMORY:
+        return Pair.of(validDocIdsType, indexSegment.getValidDocIds().getMutableRoaringBitmap());
+      case IN_MEMORY_WITH_DELETE:
+        return Pair.of(validDocIdsType, indexSegment.getQueryableDocIds().getMutableRoaringBitmap());
+      default:
+        // By default, we read the valid doc ids from snapshot.
+        LOGGER.warn("Invalid validDocIdsType: {}. Using default validDocIdsType: {}", validDocIdsType,
+            ValidDocIdsType.SNAPSHOT);
+        return Pair.of(ValidDocIdsType.SNAPSHOT, ((ImmutableSegmentImpl) indexSegment).loadValidDocIdsFromSnapshot());
+    }
+  }
+
   /**
    * Upload a low level consumer segment to segment store and return the segment download url. This endpoint is used
    * when segment store copy is unavailable for committed low level consumer segments.
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 79b17396de..bca644fb16 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -33,6 +33,9 @@ import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
 import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
 import org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.TablesList;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
@@ -45,6 +48,7 @@ import org.apache.pinot.segment.spi.index.IndexService;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -280,6 +284,8 @@ public class TablesResourceTest extends BaseResourceTest {
     // Verify the content of the downloaded snapshot from a realtime table.
     downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
         (ImmutableSegmentImpl) _realtimeIndexSegments.get(0));
+    downLoadAndVerifyValidDocIdsSnapshotBitmap(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        (ImmutableSegmentImpl) _realtimeIndexSegments.get(0));
 
     // Verify non-existent table and segment download return NOT_FOUND status.
     Response response =
@@ -299,6 +305,8 @@ public class TablesResourceTest extends BaseResourceTest {
     // Verify the content of the downloaded snapshot from a realtime table.
     downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
         (ImmutableSegmentImpl) segment);
+    downLoadAndVerifyValidDocIdsSnapshotBitmap(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        (ImmutableSegmentImpl) segment);
 
     String validDocIdMetadataPath =
         "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/validDocIdMetadata";
@@ -310,6 +318,8 @@ public class TablesResourceTest extends BaseResourceTest {
     Assert.assertEquals(validDocIdMetadata.get("totalDocs").asInt(), 100000);
     Assert.assertEquals(validDocIdMetadata.get("totalValidDocs").asInt(), 8);
     Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 99992);
+    Assert.assertEquals(validDocIdMetadata.get("segmentCrc").asText(), "1265679343");
+    Assert.assertEquals(validDocIdMetadata.get("validDocIdsType").asText(), "SNAPSHOT");
   }
 
   @Test
@@ -319,6 +329,9 @@ public class TablesResourceTest extends BaseResourceTest {
     // Verify the content of the downloaded snapshot from a realtime table.
     downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
         (ImmutableSegmentImpl) segment);
+    downLoadAndVerifyValidDocIdsSnapshotBitmap(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        (ImmutableSegmentImpl) segment);
+
     List<String> segments = List.of(segment.getSegmentName());
     TableSegments tableSegments = new TableSegments(segments);
     String validDocIdMetadataPath =
@@ -331,6 +344,8 @@ public class TablesResourceTest extends BaseResourceTest {
     Assert.assertEquals(validDocIdMetadata.get("totalDocs").asInt(), 100000);
     Assert.assertEquals(validDocIdMetadata.get("totalValidDocs").asInt(), 8);
     Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 99992);
+    Assert.assertEquals(validDocIdMetadata.get("segmentCrc").asText(), "1265679343");
+    Assert.assertEquals(validDocIdMetadata.get("validDocIdsType").asText(), "SNAPSHOT");
   }
 
   // Verify metadata file from segments.
@@ -364,26 +379,131 @@ public class TablesResourceTest extends BaseResourceTest {
   // Verify metadata file from segments.
   private void downLoadAndVerifyValidDocIdsSnapshot(String tableNameWithType, ImmutableSegmentImpl segment)
       throws IOException {
-
     String snapshotPath = "/segments/" + tableNameWithType + "/" + segment.getSegmentName() + "/validDocIds";
 
     PartitionUpsertMetadataManager upsertMetadataManager = mock(PartitionUpsertMetadataManager.class);
     ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap validDocIdsSnapshot = new ThreadSafeMutableRoaringBitmap();
+
     int[] docIds = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
     for (int docId : docIds) {
       validDocIds.add(docId);
+      queryableDocIds.add(docId + 1);
+      validDocIdsSnapshot.add(docId + 2);
     }
-    segment.enableUpsert(upsertMetadataManager, validDocIds, null);
-
-    // Download the snapshot in byte[] format.
+    segment.enableUpsert(upsertMetadataManager, validDocIds, queryableDocIds);
+    File validDocIdsSnapshotFile =
+        new File(SegmentDirectoryPaths.findSegmentDirectory(segment.getSegmentMetadata().getIndexDir()),
+            V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+    FileUtils.writeByteArrayToFile(validDocIdsSnapshotFile,
+        RoaringBitmapUtils.serialize(validDocIdsSnapshot.getMutableRoaringBitmap()));
+
+    // Check no type (default should be validDocIdsSnapshot)
     Response response = _webTarget.path(snapshotPath).request().get(Response.class);
     Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
-    byte[] snapshot = response.readEntity(byte[].class);
+    byte[] validDocIdsSnapshotBitmap = response.readEntity(byte[].class);
+    Assert.assertNotNull(validDocIdsSnapshotBitmap);
+    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(validDocIdsSnapshotBitmap)).toMutableRoaringBitmap(),
+        validDocIdsSnapshot.getMutableRoaringBitmap());
+
+    // Check snapshot type
+    response =
+        _webTarget.path(snapshotPath).queryParam("validDocIdsType", ValidDocIdsType.SNAPSHOT.toString()).request()
+            .get(Response.class);
+    Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+    validDocIdsSnapshotBitmap = response.readEntity(byte[].class);
+    Assert.assertNotNull(validDocIdsSnapshotBitmap);
+    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(validDocIdsSnapshotBitmap)).toMutableRoaringBitmap(),
+        validDocIdsSnapshot.getMutableRoaringBitmap());
 
-    // Load the snapshot file.
-    Assert.assertNotNull(snapshot);
-    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot)).toMutableRoaringBitmap(),
+    // Check onHeap type
+    response = _webTarget.path(snapshotPath).queryParam("validDocIdsType", ValidDocIdsType.IN_MEMORY).request()
+        .get(Response.class);
+    Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+    validDocIdsSnapshotBitmap = response.readEntity(byte[].class);
+    Assert.assertNotNull(validDocIdsSnapshotBitmap);
+    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(validDocIdsSnapshotBitmap)).toMutableRoaringBitmap(),
         validDocIds.getMutableRoaringBitmap());
+
+    // Check onHeapWithDelete type
+    response =
+        _webTarget.path(snapshotPath).queryParam("validDocIdsType", ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString())
+            .request().get(Response.class);
+    Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+    validDocIdsSnapshotBitmap = response.readEntity(byte[].class);
+    Assert.assertNotNull(validDocIdsSnapshotBitmap);
+    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(validDocIdsSnapshotBitmap)).toMutableRoaringBitmap(),
+        queryableDocIds.getMutableRoaringBitmap());
+  }
+
+  private void downLoadAndVerifyValidDocIdsSnapshotBitmap(String tableNameWithType, ImmutableSegmentImpl segment)
+      throws IOException {
+    String snapshotPath = "/segments/" + tableNameWithType + "/" + segment.getSegmentName() + "/validDocIdsBitmap";
+
+    PartitionUpsertMetadataManager upsertMetadataManager = mock(PartitionUpsertMetadataManager.class);
+    ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap validDocIdsSnapshot = new ThreadSafeMutableRoaringBitmap();
+
+    int[] docIds = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
+    for (int docId : docIds) {
+      validDocIds.add(docId);
+      queryableDocIds.add(docId + 1);
+      validDocIdsSnapshot.add(docId + 2);
+    }
+    segment.enableUpsert(upsertMetadataManager, validDocIds, queryableDocIds);
+    File validDocIdsSnapshotFile =
+        new File(SegmentDirectoryPaths.findSegmentDirectory(segment.getSegmentMetadata().getIndexDir()),
+            V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME);
+    FileUtils.writeByteArrayToFile(validDocIdsSnapshotFile,
+        RoaringBitmapUtils.serialize(validDocIdsSnapshot.getMutableRoaringBitmap()));
+
+    // Check no type (default should be validDocIdsSnapshot)
+    ValidDocIdsBitmapResponse response = _webTarget.path(snapshotPath).request().get(ValidDocIdsBitmapResponse.class);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(response.getSegmentCrc(), "1265679343");
+    Assert.assertEquals(response.getSegmentName(), segment.getSegmentName());
+    byte[] validDocIdsSnapshotBitmap = response.getBitmap();
+    Assert.assertNotNull(validDocIdsSnapshotBitmap);
+    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(validDocIdsSnapshotBitmap)).toMutableRoaringBitmap(),
+        validDocIdsSnapshot.getMutableRoaringBitmap());
+
+    // Check snapshot type
+    response =
+        _webTarget.path(snapshotPath).queryParam("validDocIdsType", ValidDocIdsType.SNAPSHOT.toString()).request()
+            .get(ValidDocIdsBitmapResponse.class);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(response.getSegmentCrc(), "1265679343");
+    Assert.assertEquals(response.getSegmentName(), segment.getSegmentName());
+    validDocIdsSnapshotBitmap = response.getBitmap();
+    Assert.assertNotNull(validDocIdsSnapshotBitmap);
+    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(validDocIdsSnapshotBitmap)).toMutableRoaringBitmap(),
+        validDocIdsSnapshot.getMutableRoaringBitmap());
+
+    // Check onHeap type
+    response =
+        _webTarget.path(snapshotPath).queryParam("validDocIdsType", ValidDocIdsType.IN_MEMORY.toString()).request()
+            .get(ValidDocIdsBitmapResponse.class);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(response.getSegmentCrc(), "1265679343");
+    Assert.assertEquals(response.getSegmentName(), segment.getSegmentName());
+    validDocIdsSnapshotBitmap = response.getBitmap();
+    Assert.assertNotNull(validDocIdsSnapshotBitmap);
+    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(validDocIdsSnapshotBitmap)).toMutableRoaringBitmap(),
+        validDocIds.getMutableRoaringBitmap());
+
+    // Check onHeapWithDelete type
+    response =
+        _webTarget.path(snapshotPath).queryParam("validDocIdsType", ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString())
+            .request().get(ValidDocIdsBitmapResponse.class);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(response.getSegmentCrc(), "1265679343");
+    Assert.assertEquals(response.getSegmentName(), segment.getSegmentName());
+    validDocIdsSnapshotBitmap = response.getBitmap();
+    Assert.assertNotNull(validDocIdsSnapshotBitmap);
+    Assert.assertEquals(new ImmutableRoaringBitmap(ByteBuffer.wrap(validDocIdsSnapshotBitmap)).toMutableRoaringBitmap(),
+        queryableDocIds.getMutableRoaringBitmap());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org