You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/23 18:59:49 UTC

[pinot] branch master updated: Add delete support to upsert tables (#10703)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 608cb8de2b Add delete support to upsert tables (#10703)
608cb8de2b is described below

commit 608cb8de2bd19c8497b03be07a087591bfd1c3bd
Author: Navina Ramesh <na...@apache.org>
AuthorDate: Fri Jun 23 11:59:43 2023 -0700

    Add delete support to upsert tables (#10703)
---
 .../realtime/LLRealtimeSegmentDataManager.java     |   1 +
 .../org/apache/pinot/core/plan/FilterPlanNode.java |  34 +-
 ...adataAndDictionaryAggregationPlanMakerTest.java |   8 +-
 .../plan/maker/QueryOverrideWithHintsTest.java     |   6 +
 .../tests/BaseClusterIntegrationTest.java          | 127 ++++++-
 .../tests/ClusterIntegrationTestUtils.java         |  74 ++++
 .../tests/UpsertTableIntegrationTest.java          | 372 +++++++++++++++++++++
 .../UpsertTableSegmentUploadIntegrationTest.java   |   6 +-
 .../src/test/resources/gameScores_csv.tar.gz       | Bin 0 -> 266 bytes
 .../resources/gameScores_partial_upsert_csv.tar.gz | Bin 0 -> 297 bytes
 .../resources/partial_upsert_table_test.schema     |  35 ++
 .../src/test/resources/upsert_table_test.schema    |  41 +--
 ...st.schema => upsert_upload_segment_test.schema} |   0
 ...st.tar.gz => upsert_upload_segment_test.tar.gz} | Bin
 .../indexsegment/immutable/EmptyIndexSegment.java  |   6 +
 .../immutable/ImmutableSegmentImpl.java            |  10 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |  81 ++---
 .../local/realtime/impl/RealtimeSegmentConfig.java |  19 +-
 .../upsert/BasePartitionUpsertMetadataManager.java |  52 +--
 .../upsert/BaseTableUpsertMetadataManager.java     |   3 +-
 ...oncurrentMapPartitionUpsertMetadataManager.java | 113 +++++--
 .../ConcurrentMapTableUpsertMetadataManager.java   |   3 +-
 .../pinot/segment/local/upsert/RecordInfo.java     |   8 +-
 .../pinot/segment/local/upsert/UpsertUtils.java    |  40 +--
 .../segment/local/utils/TableConfigUtils.java      |  21 +-
 .../dedup/PartitionDedupMetadataManagerTest.java   |  11 +-
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 360 ++++++++++++++++++--
 .../segment/local/utils/TableConfigUtilsTest.java  |  40 ++-
 .../org/apache/pinot/segment/spi/IndexSegment.java |  11 +
 .../pinot/server/api/TablesResourceTest.java       |   2 +-
 .../apache/pinot/spi/config/table/TableConfig.java |   6 +
 .../pinot/spi/config/table/UpsertConfig.java       |  14 +
 32 files changed, 1297 insertions(+), 207 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 99f3c6d802..3f655728bc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1425,6 +1425,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
             .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
             .setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
+            .setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
             .setFieldConfigList(tableConfig.getFieldConfigList());
 
     // Create message decoder
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 588929f2ea..82c9d51a1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -81,24 +81,33 @@ public class FilterPlanNode implements PlanNode {
 
   @Override
   public BaseFilterOperator run() {
-    // NOTE: Snapshot the validDocIds before reading the numDocs to prevent the latest updates getting lost
-    ThreadSafeMutableRoaringBitmap validDocIds = _indexSegment.getValidDocIds();
-    MutableRoaringBitmap validDocIdsSnapshot =
-        validDocIds != null && !_queryContext.isSkipUpsert() ? validDocIds.getMutableRoaringBitmap() : null;
+    // NOTE: Snapshot the queryableDocIds before reading the numDocs to prevent the latest updates getting lost
+    MutableRoaringBitmap queryableDocIdSnapshot = null;
+    if (!_queryContext.isSkipUpsert()) {
+      ThreadSafeMutableRoaringBitmap queryableDocIds = _indexSegment.getQueryableDocIds();
+      if (queryableDocIds != null) {
+        queryableDocIdSnapshot = queryableDocIds.getMutableRoaringBitmap();
+      } else {
+        ThreadSafeMutableRoaringBitmap validDocIds = _indexSegment.getValidDocIds();
+        if (validDocIds != null) {
+          queryableDocIdSnapshot = validDocIds.getMutableRoaringBitmap();
+        }
+      }
+    }
     int numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
 
     FilterContext filter = _filter != null ? _filter : _queryContext.getFilter();
     if (filter != null) {
       BaseFilterOperator filterOperator = constructPhysicalOperator(filter, numDocs);
-      if (validDocIdsSnapshot != null) {
-        BaseFilterOperator validDocFilter = new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
+      if (queryableDocIdSnapshot != null) {
+        BaseFilterOperator validDocFilter = new BitmapBasedFilterOperator(queryableDocIdSnapshot, false, numDocs);
         return FilterOperatorUtils.getAndFilterOperator(_queryContext, Arrays.asList(filterOperator, validDocFilter),
             numDocs);
       } else {
         return filterOperator;
       }
-    } else if (validDocIdsSnapshot != null) {
-      return new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
+    } else if (queryableDocIdSnapshot != null) {
+      return new BitmapBasedFilterOperator(queryableDocIdSnapshot, false, numDocs);
     } else {
       return new MatchAllFilterOperator(numDocs);
     }
@@ -250,9 +259,8 @@ public class FilterPlanNode implements PlanNode {
               return new TextContainsFilterOperator(textIndexReader, (TextContainsPredicate) predicate, numDocs);
             case TEXT_MATCH:
               textIndexReader = dataSource.getTextIndex();
-              Preconditions
-                  .checkState(textIndexReader != null, "Cannot apply TEXT_MATCH on column: %s without text index",
-                      column);
+              Preconditions.checkState(textIndexReader != null,
+                  "Cannot apply TEXT_MATCH on column: %s without text index", column);
               // We could check for real time and segment Lucene reader, but easier to check the other way round
               if (textIndexReader instanceof NativeTextIndexReader
                   || textIndexReader instanceof NativeMutableTextIndex) {
@@ -300,8 +308,8 @@ public class FilterPlanNode implements PlanNode {
                 return new MatchAllFilterOperator(numDocs);
               }
             default:
-              predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource,
-                  _queryContext);
+              predicateEvaluator =
+                  PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource, _queryContext);
               _predicateEvaluators.add(Pair.of(predicate, predicateEvaluator));
               return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs,
                   _queryContext.isNullHandlingEnabled());
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 00fec795dc..47459fb691 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -114,8 +114,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     segmentGeneratorConfig.setTableName("testTable");
     segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
     segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), IndexConfig.ENABLED,
-        "column6", "column7", "column11", "column17", "column18");
+    segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), IndexConfig.ENABLED, "column6", "column7", "column11",
+        "column17", "column18");
 
     // Build the index segment.
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
@@ -131,8 +131,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
         new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
-            Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, false, serverMetrics),
-        new ThreadSafeMutableRoaringBitmap());
+            Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, serverMetrics),
+        new ThreadSafeMutableRoaringBitmap(), null);
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
index 35ae0d685d..57f169258c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
@@ -89,6 +89,12 @@ public class QueryOverrideWithHintsTest {
       return null;
     }
 
+    @Nullable
+    @Override
+    public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+      return null;
+    }
+
     @Override
     public GenericRow getRecord(int docId, GenericRow reuse) {
       return null;
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 3d32305d76..6a6225bea5 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.DedupConfig;
@@ -54,6 +55,8 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -396,11 +399,15 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   /**
    * Creates a new Upsert enabled table config.
    */
-  protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primaryKeyColumn, int numPartitions) {
+  protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primaryKeyColumn, String deleteColumn,
+      int numPartitions) {
     AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
     Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
     columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));
 
+    UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDeleteRecordColumn(deleteColumn);
+
     return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
         .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
         .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
@@ -409,7 +416,62 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
         .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
+        .setUpsertConfig(upsertConfig).build();
+  }
+
+  protected Map<String, String> getCSVDecoderProperties(@Nullable String delimiter,
+      @Nullable String csvHeaderProperty) {
+    String streamType = "kafka";
+    Map<String, String> csvDecoderProperties = new HashMap<>();
+    csvDecoderProperties.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        CSVMessageDecoder.class.getName());
+    if (delimiter != null) {
+      csvDecoderProperties.put(StreamConfigProperties.constructStreamProperty(streamType, "decoder.prop.delimiter"),
+          delimiter);
+    }
+    if (csvHeaderProperty != null) {
+      csvDecoderProperties.put(StreamConfigProperties.constructStreamProperty(streamType, "decoder.prop.header"),
+          csvHeaderProperty);
+    }
+    return csvDecoderProperties;
+  }
+
+  /**
+   * Creates a new Upsert enabled table config.
+   */
+  protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable String schemaName,
+      @Nullable String kafkaTopicName, int numPartitions, Map<String, String> streamDecoderProperties,
+      UpsertConfig upsertConfig, String primaryKeyColumn) {
+    if (schemaName == null) {
+      schemaName = getSchemaName();
+    }
+    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
+    columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));
+
+    if (upsertConfig == null) {
+      upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    }
+    if (kafkaTopicName == null) {
+      kafkaTopicName = getKafkaTopic();
+    }
+
+    Map<String, String> streamConfigsMap = getStreamConfigMap();
+    streamConfigsMap.put(
+        StreamConfigProperties.constructStreamProperty("kafka", StreamConfigProperties.STREAM_TOPIC_NAME),
+        kafkaTopicName);
+    streamConfigsMap.putAll(streamDecoderProperties);
+
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setSchemaName(schemaName)
+        .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+        .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
+        .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+        .setLLC(useLlc()).setStreamConfigs(streamConfigsMap)
+        .setNullHandlingEnabled(UpsertConfig.Mode.PARTIAL.equals(upsertConfig.getMode()) || getNullHandlingEnabled())
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
+        .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
+        .setUpsertConfig(upsertConfig).build();
   }
 
   /**
@@ -498,21 +560,26 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
     _queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
   }
 
+  protected List<File> unpackAvroData(File outputDir)
+      throws Exception {
+    return unpackTarData(getAvroTarFileName(), outputDir);
+  }
+
   /**
-   * Unpack the tarred Avro data into the given directory.
+   * Unpack the tarred data into the given directory.
    *
+   * @param tarFileName Input tar filename
    * @param outputDir Output directory
    * @return List of files unpacked.
    * @throws Exception
    */
-  protected List<File> unpackAvroData(File outputDir)
+  protected List<File> unpackTarData(String tarFileName, File outputDir)
       throws Exception {
     InputStream inputStream =
-        BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(getAvroTarFileName());
+        BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
     Assert.assertNotNull(inputStream);
     return TarGzCompressionUtils.untar(inputStream, outputDir);
   }
-
   /**
    * Pushes the data in the given Avro files into a Kafka stream.
    *
@@ -520,11 +587,57 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
    */
   protected void pushAvroIntoKafka(List<File> avroFiles)
       throws Exception {
-
     ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, "localhost:" + getKafkaPort(), getKafkaTopic(),
         getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
   }
 
+  /**
+   * Pushes the data in the given Avro files into a Kafka stream.
+   *
+   * @param csvFile List of CSV strings
+   */
+  protected void pushCsvIntoKafka(File csvFile, String kafkaTopic, @Nullable Integer partitionColumnIndex)
+      throws Exception {
+    String kafkaBroker = "localhost:" + getKafkaPort();
+    StreamDataProducer producer = null;
+    try {
+      producer =
+        StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+            getDefaultKafkaProducerProperties(kafkaBroker));
+      ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, injectTombstones(),
+          producer);
+    } catch (Exception e) {
+      if (producer != null) {
+        producer.close();
+      }
+      throw e;
+    }
+  }
+
+  protected void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic, @Nullable Integer partitionColumnIndex) {
+    String kafkaBroker = "localhost:" + getKafkaPort();
+    StreamDataProducer producer = null;
+    try {
+      producer =
+          StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+              getDefaultKafkaProducerProperties(kafkaBroker));
+      ClusterIntegrationTestUtils.pushCsvIntoKafka(csvRecords, kafkaTopic, partitionColumnIndex, injectTombstones(),
+          producer);
+    } catch (Exception e) {
+      if (producer != null) {
+        producer.close();
+      }
+    }
+  }
+  private Properties getDefaultKafkaProducerProperties(String kafkaBroker) {
+    Properties properties = new Properties();
+    properties.put("metadata.broker.list", kafkaBroker);
+    properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
+    properties.put("request.required.acks", "1");
+    properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
+    return properties;
+  }
+
   protected boolean injectTombstones() {
     return false;
   }
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 189931988e..204303e05e 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -55,6 +56,9 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -337,6 +341,76 @@ public class ClusterIntegrationTestUtils {
     TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
   }
 
+  /**
+   * Push the records from the given Avro files into a Kafka stream.
+   *
+   * @param csvFile CSV File name
+   * @param kafkaTopic Kafka topic
+   * @param partitionColumnIndex Optional Index of the partition column
+   * @throws Exception
+   */
+  public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
+      @Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
+      throws Exception {
+
+    if (injectTombstones) {
+      // publish lots of tombstones to livelock the consumer if it can't handle this properly
+      for (int i = 0; i < 1000; i++) {
+        // publish a tombstone first
+        producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+      }
+    }
+    CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
+    try (CSVParser parser = CSVParser.parse(csvFile, StandardCharsets.UTF_8, csvFormat)) {
+      for (CSVRecord csv : parser) {
+        byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
+            : csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
+        List<String> cols = new ArrayList<>();
+        for (String col : csv) {
+          cols.add(col);
+        }
+        byte[] bytes = String.join(",", cols).getBytes(StandardCharsets.UTF_8);
+        producer.produce(kafkaTopic, keyBytes, bytes);
+      }
+    }
+  }
+
+  /**
+   * Push the records from the given Avro files into a Kafka stream.
+   *
+   * @param csvRecords List of CSV record string
+   * @param kafkaTopic Kafka topic
+   * @param partitionColumnIndex Optional Index of the partition column
+   * @throws Exception
+   */
+  public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
+      @Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
+      throws Exception {
+
+    if (injectTombstones) {
+      // publish lots of tombstones to livelock the consumer if it can't handle this properly
+      for (int i = 0; i < 1000; i++) {
+        // publish a tombstone first
+        producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+      }
+    }
+    CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
+    for (String recordCsv: csvRecords) {
+      try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
+        for (CSVRecord csv : parser) {
+          byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
+              : csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
+          List<String> cols = new ArrayList<>();
+          for (String col : csv) {
+            cols.add(col);
+          }
+          byte[] bytes = String.join(",", cols).getBytes(StandardCharsets.UTF_8);
+          producer.produce(kafkaTopic, keyBytes, bytes);
+        }
+      }
+    }
+  }
+
   /**
    * Push the records from the given Avro files into a Kafka stream.
    *
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
new file mode 100644
index 0000000000..5eb5469432
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Input data - Scores of players
+ * Schema
+ *  - Dimension fields: playerId:int (primary key), name:string, game:string, deleted:boolean
+ *  - Metric fields: score:float
+ *  - DataTime fields: timestampInEpoch:long
+ */
+public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
+  private static final String INPUT_DATA_TAR_FILE = "gameScores_csv.tar.gz";
+  private static final String CSV_SCHEMA_HEADER = "playerId,name,game,score,timestampInEpoch,deleted";
+  private static final String PARTIAL_UPSERT_TABLE_SCHEMA = "partial_upsert_table_test.schema";
+  private static final String CSV_DELIMITER = ",";
+  private static final String TABLE_NAME = "gameScores";
+  private static final int NUM_SERVERS = 2;
+  private static final String PRIMARY_KEY_COL = "playerId";
+  protected static final String DELETE_COL = "deleted";
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    // Start a customized controller with more frequent realtime segment validation
+    startController();
+    startBroker();
+    startServers(NUM_SERVERS);
+
+    // Start Kafka and push data into Kafka
+    startKafka();
+
+    List<File> unpackDataFiles = unpackTarData(INPUT_DATA_TAR_FILE, _tempDir);
+    pushCsvIntoKafka(unpackDataFiles.get(0), getKafkaTopic(), 0);  // TODO: Fix
+
+    // Create and upload schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+
+    Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+    TableConfig tableConfig = createCSVUpsertTableConfig(getTableName(), getSchemaName(), getKafkaTopic(),
+        getNumKafkaPartitions(), csvDecoderProperties, null, PRIMARY_KEY_COL);
+    addTableConfig(tableConfig);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+
+    // Create partial upsert table schema
+    Schema partialUpsertSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA);
+    addSchema(partialUpsertSchema);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    dropRealtimeTable(realtimeTableName);
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Override
+  protected String getSchemaFileName() {
+    return "upsert_table_test.schema";
+  }
+
+  @Override
+  protected String getSchemaName() {
+    return "playerScores";
+  }
+
+  @Nullable
+  @Override
+  protected String getTimeColumnName() {
+    return "timestampInEpoch";
+  }
+
+  @Override
+  protected String getPartitionColumn() {
+    return PRIMARY_KEY_COL;
+  }
+
+  @Override
+  protected String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    // Three distinct records are expected with pk values of 100, 101, 102
+    return 3;
+  }
+
+  private Schema createSchema(String schemaFileName)
+      throws IOException {
+    InputStream inputStream =
+        BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
+    Assert.assertNotNull(inputStream);
+    return Schema.fromInputStream(inputStream);
+  }
+  private long queryCountStarWithoutUpsert(String tableName) {
+    return getPinotConnection().execute("SELECT COUNT(*) FROM " + tableName + " OPTION(skipUpsert=true)")
+        .getResultSet(0).getLong(0);
+  }
+
+  private long getCountStarResultWithoutUpsert() {
+    return 10;
+  }
+
+  @Override
+  protected void waitForAllDocsLoaded(long timeoutMs)
+      throws Exception {
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return queryCountStarWithoutUpsert(getTableName()) == getCountStarResultWithoutUpsert();
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, timeoutMs, "Failed to load all documents");
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
+  }
+
+  @Test
+  protected void testDeleteWithFullUpsert()
+      throws Exception {
+    final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDeleteRecordColumn(DELETE_COL);
+
+    testDeleteWithFullUpsert(getKafkaTopic() + "-with-deletes", "gameScoresWithDelete", upsertConfig);
+  }
+
+  protected void testDeleteWithFullUpsert(String kafkaTopicName, String tableName, UpsertConfig upsertConfig)
+      throws Exception {
+    // SETUP
+    // Create table with delete Record column
+    Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+    TableConfig tableConfig = createCSVUpsertTableConfig(tableName, getSchemaName(), kafkaTopicName,
+        getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, PRIMARY_KEY_COL);
+    addTableConfig(tableConfig);
+
+    // Push initial 10 upsert records - 3 pks 100, 101 and 102
+    List<File> dataFiles = unpackTarData(INPUT_DATA_TAR_FILE, _tempDir);
+    pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+
+    // TEST 1: Delete existing primary key
+    // Push 2 records with deleted = true - deletes pks 100 and 102
+    List<String> deleteRecords = List.of("102,Clifford,counter-strike,102,1681054200000,true",
+        "100,Zook,counter-strike,2050,1681377200000,true");
+    pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
+
+    // Wait for all docs (12 with skipUpsert=true) to be loaded
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return queryCountStarWithoutUpsert(tableName) == 12;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
+
+    // Query for number of records in the table - should only return 1
+    ResultSet rs = getPinotConnection().execute("SELECT * FROM " + tableName).getResultSet(0);
+    Assert.assertEquals(rs.getRowCount(), 1);
+
+    // pk 101 - not deleted - only record available
+    int columnCount = rs.getColumnCount();
+    int playerIdColumnIndex = -1;
+    for (int i = 0; i < columnCount; i++) {
+      String columnName = rs.getColumnName(i);
+      if ("playerId".equalsIgnoreCase(columnName)) {
+        playerIdColumnIndex = i;
+        break;
+      }
+    }
+    Assert.assertNotEquals(playerIdColumnIndex, -1);
+    Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101");
+
+    // Validate deleted records
+    rs = getPinotConnection()
+        .execute("SELECT playerId FROM " + tableName
+            + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
+    Assert.assertEquals(rs.getRowCount(), 2);
+    for (int i = 0; i < rs.getRowCount(); i++) {
+      String playerId = rs.getString(i, 0);
+      Assert.assertTrue("100".equalsIgnoreCase(playerId) || "102".equalsIgnoreCase(playerId));
+    }
+
+    // TEST 2: Revive a previously deleted primary key
+    // Revive pk - 100 by adding a record with a newer timestamp
+    List<String> revivedRecord = Collections.singletonList("100,Zook-New,,0.0,1684707335000,false");
+    pushCsvIntoKafka(revivedRecord, kafkaTopicName, 0);
+    // Wait for the new record (13 with skipUpsert=true) to be indexed
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return queryCountStarWithoutUpsert(tableName) == 13;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
+
+
+    // Validate: pk is queryable and all columns are overwritten with new value
+    rs = getPinotConnection()
+        .execute("SELECT playerId, name, game FROM " + tableName + " WHERE playerId = 100").getResultSet(0);
+    Assert.assertEquals(rs.getRowCount(), 1);
+    Assert.assertEquals(rs.getInt(0, 0), 100);
+    Assert.assertEquals(rs.getString(0, 1), "Zook-New");
+    Assert.assertEquals(rs.getString(0, 2), "null");
+
+    // Validate: pk lineage still exists
+    rs = getPinotConnection()
+        .execute("SELECT playerId, name FROM " + tableName
+            + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0);
+
+    Assert.assertTrue(rs.getRowCount() > 1);
+
+    // TEARDOWN
+    dropRealtimeTable(tableName);
+  }
+
+  @Test
+  public void testDeleteWithPartialUpsert()
+      throws Exception {
+    final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
+    upsertConfig.setDeleteRecordColumn(DELETE_COL);
+
+    testDeleteWithPartialUpsert(getKafkaTopic() + "-partial-upsert-with-deletes",
+        "gameScoresPartialUpsertWithDelete", upsertConfig);
+  }
+
+  protected void testDeleteWithPartialUpsert(String kafkaTopicName, String tableName, UpsertConfig upsertConfig)
+      throws Exception {
+    final String partialUpsertSchemaName = "playerScoresPartialUpsert";
+    final String inputDataTarFile = "gameScores_partial_upsert_csv.tar.gz";
+
+    Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
+    partialUpsertStrategies.put("game", UpsertConfig.Strategy.UNION);
+    partialUpsertStrategies.put("score", UpsertConfig.Strategy.INCREMENT);
+    partialUpsertStrategies.put(DELETE_COL, UpsertConfig.Strategy.OVERWRITE);
+    upsertConfig.setPartialUpsertStrategies(partialUpsertStrategies);
+
+    // Create table with delete Record column
+    Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+    TableConfig tableConfig = createCSVUpsertTableConfig(tableName, partialUpsertSchemaName, kafkaTopicName,
+        getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, PRIMARY_KEY_COL);
+    addTableConfig(tableConfig);
+
+    // Push initial 10 upsert records - 3 pks 100, 101 and 102
+    List<File> dataFiles = unpackTarData(inputDataTarFile, _tempDir);
+    pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+
+    // TEST 1: Delete existing primary key
+    // Push 2 records with deleted = true - deletes pks 100 and 102
+    List<String> deleteRecords = List.of("102,Clifford,counter-strike,102,1681054200000,true",
+        "100,Zook,counter-strike,2050,1681377200000,true");
+    pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
+
+    // Wait for all docs (12 with skipUpsert=true) to be loaded
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return queryCountStarWithoutUpsert(tableName) == 12;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
+
+    // Query for number of records in the table - should only return 1
+    ResultSet rs = getPinotConnection().execute("SELECT * FROM " + tableName).getResultSet(0);
+    Assert.assertEquals(rs.getRowCount(), 1);
+
+    // pk 101 - not deleted - only record available
+    int columnCount = rs.getColumnCount();
+    int playerIdColumnIndex = -1;
+    for (int i = 0; i < columnCount; i++) {
+      String columnName = rs.getColumnName(i);
+      if ("playerId".equalsIgnoreCase(columnName)) {
+        playerIdColumnIndex = i;
+        break;
+      }
+    }
+    Assert.assertNotEquals(playerIdColumnIndex, -1);
+    Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101");
+
+    // Validate deleted records
+    rs = getPinotConnection()
+        .execute("SELECT playerId FROM " + tableName
+            + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
+    Assert.assertEquals(rs.getRowCount(), 2);
+    for (int i = 0; i < rs.getRowCount(); i++) {
+      String playerId = rs.getString(i, 0);
+      Assert.assertTrue("100".equalsIgnoreCase(playerId) || "102".equalsIgnoreCase(playerId));
+    }
+
+    // TEST 2: Revive a previously deleted primary key
+    // Revive pk - 100 by adding a record with a newer timestamp
+    List<String> revivedRecord = Collections.singletonList("100,Zook,,0.0,1684707335000,false");
+    pushCsvIntoKafka(revivedRecord, kafkaTopicName, 0);
+    // Wait for the new record (13 with skipUpsert=true) to be indexed
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return queryCountStarWithoutUpsert(tableName) == 13;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
+
+    // Validate: pk is queryable and all columns are overwritten with new value
+    rs = getPinotConnection()
+        .execute("SELECT playerId, name, game FROM " + tableName
+            + " WHERE playerId = 100").getResultSet(0);
+    Assert.assertEquals(rs.getRowCount(), 1);
+    Assert.assertEquals(rs.getInt(0, 0), 100);
+    Assert.assertEquals(rs.getString(0, 1), "Zook");
+    Assert.assertEquals(rs.getString(0, 2), "[\"null\"]");
+
+    // Validate: pk lineage still exists
+    rs = getPinotConnection()
+        .execute("SELECT playerId, name FROM " + tableName
+            + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0);
+
+    Assert.assertTrue(rs.getRowCount() > 1);
+
+    // TEARDOWN
+    dropRealtimeTable(tableName);
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index e7966e3608..7fddd5e43c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -74,7 +74,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
     // Create and upload schema and table config
     Schema schema = createSchema();
     addSchema(schema);
-    TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions());
+    TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
     addTableConfig(tableConfig);
 
     // Create and upload segments
@@ -116,7 +116,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
 
   @Override
   protected String getSchemaFileName() {
-    return "upsert_table_test.schema";
+    return "upsert_upload_segment_test.schema";
   }
 
   @Override
@@ -126,7 +126,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
 
   @Override
   protected String getAvroTarFileName() {
-    return "upsert_test.tar.gz";
+    return "upsert_upload_segment_test.tar.gz";
   }
 
   @Override
diff --git a/pinot-integration-tests/src/test/resources/gameScores_csv.tar.gz b/pinot-integration-tests/src/test/resources/gameScores_csv.tar.gz
new file mode 100644
index 0000000000..48dc5163c6
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/gameScores_csv.tar.gz differ
diff --git a/pinot-integration-tests/src/test/resources/gameScores_partial_upsert_csv.tar.gz b/pinot-integration-tests/src/test/resources/gameScores_partial_upsert_csv.tar.gz
new file mode 100644
index 0000000000..a6fd014e96
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/gameScores_partial_upsert_csv.tar.gz differ
diff --git a/pinot-integration-tests/src/test/resources/partial_upsert_table_test.schema b/pinot-integration-tests/src/test/resources/partial_upsert_table_test.schema
new file mode 100644
index 0000000000..ceb2f2d287
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/partial_upsert_table_test.schema
@@ -0,0 +1,35 @@
+{
+  "schemaName": "playerScoresPartialUpsert",
+  "dimensionFieldSpecs": [
+    {
+      "name": "playerId",
+      "dataType": "INT"
+    },
+    {
+      "name": "name",
+      "dataType": "STRING"
+    },
+    {
+      "name": "game",
+      "dataType": "STRING",
+      "singleValueField": "false"
+    },
+    {
+      "name": "deleted",
+      "dataType": "BOOLEAN"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "name": "score",
+      "dataType": "FLOAT"
+    }
+  ],
+  "dateTimeFieldSpecs": [{
+    "name": "timestampInEpoch",
+    "dataType": "LONG",
+    "format" : "1:MILLISECONDS:EPOCH",
+    "granularity": "1:MILLISECONDS"
+  }],
+  "primaryKeyColumns": [ "playerId" ]
+}
diff --git a/pinot-integration-tests/src/test/resources/upsert_table_test.schema b/pinot-integration-tests/src/test/resources/upsert_table_test.schema
index 3f656f7936..c0bcebfaae 100644
--- a/pinot-integration-tests/src/test/resources/upsert_table_test.schema
+++ b/pinot-integration-tests/src/test/resources/upsert_table_test.schema
@@ -1,33 +1,34 @@
 {
+  "schemaName": "playerScores",
   "dimensionFieldSpecs": [
     {
-      "dataType": "INT",
-      "singleValueField": true,
-      "name": "clientId"
+      "name": "playerId",
+      "dataType": "INT"
     },
     {
-      "dataType": "STRING",
-      "singleValueField": true,
-      "name": "city"
+      "name": "name",
+      "dataType": "STRING"
     },
     {
-      "dataType": "STRING",
-      "singleValueField": true,
-      "name": "description"
+      "name": "game",
+      "dataType": "STRING"
     },
     {
-      "dataType": "INT",
-      "singleValueField": true,
-      "name": "salary"
+      "name": "deleted",
+      "dataType": "BOOLEAN"
     }
   ],
-  "timeFieldSpec": {
-    "incomingGranularitySpec": {
-      "timeType": "DAYS",
-      "dataType": "INT",
-      "name": "DaysSinceEpoch"
+  "metricFieldSpecs": [
+    {
+      "name": "score",
+      "dataType": "FLOAT"
     }
-  },
-  "primaryKeyColumns": ["clientId"],
-  "schemaName": "upsertSchema"
+  ],
+  "dateTimeFieldSpecs": [{
+    "name": "timestampInEpoch",
+    "dataType": "LONG",
+    "format" : "1:MILLISECONDS:EPOCH",
+    "granularity": "1:MILLISECONDS"
+  }],
+  "primaryKeyColumns": [ "playerId" ]
 }
diff --git a/pinot-integration-tests/src/test/resources/upsert_table_test.schema b/pinot-integration-tests/src/test/resources/upsert_upload_segment_test.schema
similarity index 100%
copy from pinot-integration-tests/src/test/resources/upsert_table_test.schema
copy to pinot-integration-tests/src/test/resources/upsert_upload_segment_test.schema
diff --git a/pinot-integration-tests/src/test/resources/upsert_test.tar.gz b/pinot-integration-tests/src/test/resources/upsert_upload_segment_test.tar.gz
similarity index 100%
rename from pinot-integration-tests/src/test/resources/upsert_test.tar.gz
rename to pinot-integration-tests/src/test/resources/upsert_upload_segment_test.tar.gz
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
index 20c0c764b4..681e93a0e0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
@@ -91,6 +91,12 @@ public class EmptyIndexSegment implements ImmutableSegment {
     return null;
   }
 
+  @Nullable
+  @Override
+  public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+    return null;
+  }
+
   @Override
   public GenericRow getRecord(int docId, GenericRow reuse) {
     throw new UnsupportedOperationException("Cannot read record from empty segment");
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 66875eb726..dcf9d65362 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -76,6 +76,7 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
   // For upsert
   private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private ThreadSafeMutableRoaringBitmap _validDocIds;
+  private ThreadSafeMutableRoaringBitmap _queryableDocIds;
 
   public ImmutableSegmentImpl(SegmentDirectory segmentDirectory, SegmentMetadataImpl segmentMetadata,
       Map<String, ColumnIndexContainer> columnIndexContainerMap,
@@ -100,9 +101,10 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
    * Enables upsert for this segment. It should be called before the segment getting queried.
    */
   public void enableUpsert(PartitionUpsertMetadataManager partitionUpsertMetadataManager,
-      ThreadSafeMutableRoaringBitmap validDocIds) {
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds) {
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _validDocIds = validDocIds;
+    _queryableDocIds = queryableDocIds;
   }
 
   @Nullable
@@ -289,6 +291,12 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
     return _validDocIds;
   }
 
+  @Nullable
+  @Override
+  public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+    return _queryableDocIds;
+  }
+
   @Override
   public GenericRow getRecord(int docId, GenericRow reuse) {
     try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 590f8a124c..2c6885ace5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -92,7 +92,6 @@ import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -102,6 +101,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.FixedIntArray;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -159,10 +159,11 @@ public class MutableSegmentImpl implements MutableSegment {
 
   private RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders _realtimeLuceneReaders;
 
-  private final UpsertConfig.Mode _upsertMode;
-  private final List<String> _upsertComparisonColumns;
-  private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
+
+  private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+  private final List<String> _upsertComparisonColumns;
+  private final String _deleteRecordColumn;
   // The valid doc ids are maintained locally instead of in the upsert metadata manager because:
   // 1. There is only one consuming segment per partition, the committed segments do not need to modify the valid doc
   //    ids for the consuming segment.
@@ -172,6 +173,7 @@ public class MutableSegmentImpl implements MutableSegment {
   //        consumption with newer timestamp (late event in consuming segment), the record location will be updated, but
   //        the valid doc ids won't be updated.
   private final ThreadSafeMutableRoaringBitmap _validDocIds;
+  private final ThreadSafeMutableRoaringBitmap _queryableDocIds;
 
   public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable ServerMetrics serverMetrics) {
     _serverMetrics = serverMetrics;
@@ -356,22 +358,27 @@ public class MutableSegmentImpl implements MutableSegment {
       realtimeLuceneIndexRefreshState.addRealtimeReadersToQueue(_realtimeLuceneReaders);
     }
 
-    // init upsert-related data structure
-    _upsertMode = config.getUpsertMode();
     _partitionDedupMetadataManager = config.getPartitionDedupMetadataManager();
 
-    if (isUpsertEnabled()) {
+    _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
+    if (_partitionUpsertMetadataManager != null) {
       Preconditions.checkState(!isAggregateMetricsEnabled(),
           "Metrics aggregation and upsert cannot be enabled together");
-      _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
-      _validDocIds = new ThreadSafeMutableRoaringBitmap();
       List<String> upsertComparisonColumns = config.getUpsertComparisonColumns();
       _upsertComparisonColumns =
           upsertComparisonColumns != null ? upsertComparisonColumns : Collections.singletonList(_timeColumnName);
+      _deleteRecordColumn = config.getUpsertDeleteRecordColumn();
+      _validDocIds = new ThreadSafeMutableRoaringBitmap();
+      if (_deleteRecordColumn != null) {
+        _queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+      } else {
+        _queryableDocIds = null;
+      }
     } else {
-      _partitionUpsertMetadataManager = null;
-      _validDocIds = null;
       _upsertComparisonColumns = null;
+      _deleteRecordColumn = null;
+      _validDocIds = null;
+      _queryableDocIds = null;
     }
   }
 
@@ -459,21 +466,18 @@ public class MutableSegmentImpl implements MutableSegment {
     boolean canTakeMore;
     int numDocsIndexed = _numDocsIndexed;
 
-    RecordInfo recordInfo = null;
-
-    if (isDedupEnabled() || isUpsertEnabled()) {
-      recordInfo = getRecordInfo(row, numDocsIndexed);
-    }
-
-    if (isDedupEnabled() && _partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(),
-        this)) {
-      if (_serverMetrics != null) {
-        _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1);
+    if (isDedupEnabled()) {
+      PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+      if (_partitionDedupMetadataManager.checkRecordPresentOrUpdate(primaryKey, this)) {
+        if (_serverMetrics != null) {
+          _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1);
+        }
+        return true;
       }
-      return true;
     }
 
     if (isUpsertEnabled()) {
+      RecordInfo recordInfo = getRecordInfo(row, numDocsIndexed);
       GenericRow updatedRow = _partitionUpsertMetadataManager.updateRecord(row, recordInfo);
       updateDictionary(updatedRow);
       addNewRow(numDocsIndexed, updatedRow);
@@ -512,7 +516,7 @@ public class MutableSegmentImpl implements MutableSegment {
   }
 
   private boolean isUpsertEnabled() {
-    return _upsertMode != UpsertConfig.Mode.NONE;
+    return _partitionUpsertMetadataManager != null;
   }
 
   private boolean isDedupEnabled() {
@@ -521,22 +525,18 @@ public class MutableSegmentImpl implements MutableSegment {
 
   private RecordInfo getRecordInfo(GenericRow row, int docId) {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
-
-    if (isUpsertEnabled()) {
-      if (_upsertComparisonColumns.size() > 1) {
-        return multiComparisonRecordInfo(primaryKey, docId, row);
-      }
-      Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0));
-      return new RecordInfo(primaryKey, docId, comparisonValue);
-    }
-
-    return new RecordInfo(primaryKey, docId, null);
+    Comparable comparisonValue = getComparisonValue(row);
+    boolean deleteRecord = _deleteRecordColumn != null && BooleanUtils.toBoolean(row.getValue(_deleteRecordColumn));
+    return new RecordInfo(primaryKey, docId, comparisonValue, deleteRecord);
   }
 
-  private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) {
+  private Comparable getComparisonValue(GenericRow row) {
     int numComparisonColumns = _upsertComparisonColumns.size();
-    Comparable[] comparisonValues = new Comparable[numComparisonColumns];
+    if (numComparisonColumns == 1) {
+      return (Comparable) row.getValue(_upsertComparisonColumns.get(0));
+    }
 
+    Comparable[] comparisonValues = new Comparable[numComparisonColumns];
     int comparableIndex = -1;
     for (int i = 0; i < numComparisonColumns; i++) {
       String columnName = _upsertComparisonColumns.get(i);
@@ -557,9 +557,8 @@ public class MutableSegmentImpl implements MutableSegment {
         comparisonValues[i] = (Comparable) comparisonValue;
       }
     }
-    Preconditions.checkState(comparableIndex != -1,
-        "Documents must have exactly 1 non-null comparison column value");
-    return new RecordInfo(primaryKey, docId, new ComparisonColumns(comparisonValues, comparableIndex));
+    Preconditions.checkState(comparableIndex != -1, "Documents must have exactly 1 non-null comparison column value");
+    return new ComparisonColumns(comparisonValues, comparableIndex);
   }
 
   private void updateDictionary(GenericRow row) {
@@ -863,6 +862,12 @@ public class MutableSegmentImpl implements MutableSegment {
     return _validDocIds;
   }
 
+  @Nullable
+  @Override
+  public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+    return _queryableDocIds;
+  }
+
   @Override
   public GenericRow getRecord(int docId, GenericRow reuse) {
     try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 0238330c91..f4fd501ac5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -61,6 +61,7 @@ public class RealtimeSegmentConfig {
   private final boolean _nullHandlingEnabled;
   private final UpsertConfig.Mode _upsertMode;
   private final List<String> _upsertComparisonColumns;
+  private final String _upsertDeleteRecordColumn;
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
   private final String _consumerDir;
@@ -74,7 +75,7 @@ public class RealtimeSegmentConfig {
       PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
       PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
       String consumerDir, UpsertConfig.Mode upsertMode, List<String> upsertComparisonColumns,
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+      String upsertDeleteRecordColumn, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
       PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
       List<AggregationConfig> ingestionAggregationConfigs) {
     _tableNameWithType = tableNameWithType;
@@ -97,6 +98,7 @@ public class RealtimeSegmentConfig {
     _consumerDir = consumerDir;
     _upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
     _upsertComparisonColumns = upsertComparisonColumns;
+    _upsertDeleteRecordColumn = upsertDeleteRecordColumn;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _partitionDedupMetadataManager = partitionDedupMetadataManager;
     _fieldConfigList = fieldConfigList;
@@ -188,6 +190,10 @@ public class RealtimeSegmentConfig {
     return _upsertComparisonColumns;
   }
 
+  public String getUpsertDeleteRecordColumn() {
+    return _upsertDeleteRecordColumn;
+  }
+
   public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() {
     return _partitionUpsertMetadataManager;
   }
@@ -225,6 +231,7 @@ public class RealtimeSegmentConfig {
     private String _consumerDir;
     private UpsertConfig.Mode _upsertMode;
     private List<String> _upsertComparisonColumns;
+    private String _upsertDeleteRecordColumn;
     private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
     private PartitionDedupMetadataManager _partitionDedupMetadataManager;
     private List<FieldConfig> _fieldConfigList;
@@ -360,6 +367,11 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setUpsertDeleteRecordColumn(String upsertDeleteRecordColumn) {
+      _upsertDeleteRecordColumn = upsertDeleteRecordColumn;
+      return this;
+    }
+
     public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
       _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
       return this;
@@ -389,8 +401,9 @@ public class RealtimeSegmentConfig {
       return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, _streamName, _schema, _timeColumnName,
           _capacity, _avgNumMultiValues, Collections.unmodifiableMap(indexConfigByCol), _segmentZKMetadata, _offHeap,
           _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
-          _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _partitionUpsertMetadataManager,
-          _partitionDedupMetadataManager, _fieldConfigList, _ingestionAggregationConfigs);
+          _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _upsertDeleteRecordColumn,
+          _partitionUpsertMetadataManager, _partitionDedupMetadataManager, _fieldConfigList,
+          _ingestionAggregationConfigs);
     }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 1501cd28e6..92c1a18749 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -57,6 +57,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final int _partitionId;
   protected final List<String> _primaryKeyColumns;
   protected final List<String> _comparisonColumns;
+  protected final String _deleteRecordColumn;
   protected final HashFunction _hashFunction;
   protected final PartialUpsertHandler _partialUpsertHandler;
   protected final boolean _enableSnapshot;
@@ -78,12 +79,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected int _numOutOfOrderEvents = 0;
 
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
-      List<String> primaryKeyColumns, List<String> comparisonColumns, HashFunction hashFunction,
-      @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
+      List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
+      HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
+      ServerMetrics serverMetrics) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _primaryKeyColumns = primaryKeyColumns;
     _comparisonColumns = comparisonColumns;
+    _deleteRecordColumn = deleteRecordColumn;
     _hashFunction = hashFunction;
     _partialUpsertHandler = partialUpsertHandler;
     _enableSnapshot = enableSnapshot;
@@ -138,7 +141,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       if (validDocIds != null && validDocIds.isEmpty()) {
         _logger.info("Skip adding segment: {} without valid doc, current primary key count: {}",
             segment.getSegmentName(), getNumPrimaryKeys());
-        segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap());
+        segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
         return;
       }
     } else {
@@ -146,8 +149,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       segment.deleteValidDocIdsSnapshot();
     }
 
-    try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
-        _comparisonColumns)) {
+    try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+        _comparisonColumns, _deleteRecordColumn)) {
       Iterator<RecordInfo> recordInfoIterator;
       if (validDocIds != null) {
         recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds);
@@ -155,7 +158,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
         recordInfoIterator =
             UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
       }
-      addSegment(segment, null, recordInfoIterator);
+      addSegment(segment, null, null, recordInfoIterator);
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while adding segment: %s, table: %s", segmentName, _tableNameWithType), e);
@@ -171,12 +174,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   }
 
   /**
-   * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in
-   *       validDocIds should always be empty.
+   * NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
+   *       tests. The passed in bitmaps should always be empty.
    */
   @VisibleForTesting
   public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
-      Iterator<RecordInfo> recordInfoIterator) {
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
     String segmentName = segment.getSegmentName();
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
@@ -184,7 +187,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       if (validDocIds == null) {
         validDocIds = new ThreadSafeMutableRoaringBitmap();
       }
-      addOrReplaceSegment(segment, validDocIds, recordInfoIterator, null, null);
+      if (queryableDocIds == null && _deleteRecordColumn != null) {
+        queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+      }
+      addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
     } finally {
       segmentLock.unlock();
     }
@@ -193,8 +199,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected abstract long getNumPrimaryKeys();
 
   protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
-      Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
-      @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
+      @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
 
   @Override
   public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
@@ -254,15 +260,15 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
 
     if (segment instanceof EmptyIndexSegment) {
       _logger.info("Skip adding empty segment: {}", segmentName);
-      replaceSegment(segment, null, null, oldSegment);
+      replaceSegment(segment, null, null, null, oldSegment);
       return;
     }
 
-    try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
-        _comparisonColumns)) {
+    try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+        _comparisonColumns, _deleteRecordColumn)) {
       Iterator<RecordInfo> recordInfoIterator =
           UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
-      replaceSegment(segment, null, recordInfoIterator, oldSegment);
+      replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while replacing segment: %s, table: %s", segmentName, _tableNameWithType), e);
@@ -278,12 +284,13 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   }
 
   /**
-   * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in
-   *       validDocIds should always be empty.
+   * NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
+   *       tests. The passed in bitmaps should always be empty.
    */
   @VisibleForTesting
   public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
-      @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable Iterator<RecordInfo> recordInfoIterator,
+      IndexSegment oldSegment) {
     String segmentName = segment.getSegmentName();
     Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
     segmentLock.lock();
@@ -297,8 +304,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
         if (validDocIds == null) {
           validDocIds = new ThreadSafeMutableRoaringBitmap();
         }
-        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment,
-            validDocIdsForOldSegment);
+        if (queryableDocIds == null && _deleteRecordColumn != null) {
+          queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+        }
+        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, queryableDocIds, recordInfoIterator,
+            oldSegment, validDocIdsForOldSegment);
       }
 
       if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 2ceb5ddc72..06e112b1e6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -37,6 +37,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   protected String _tableNameWithType;
   protected List<String> _primaryKeyColumns;
   protected List<String> _comparisonColumns;
+  protected String _deleteRecordColumn;
   protected HashFunction _hashFunction;
   protected PartialUpsertHandler _partialUpsertHandler;
   protected boolean _enableSnapshot;
@@ -60,6 +61,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
       _comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
     }
 
+    _deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
     _hashFunction = upsertConfig.getHashFunction();
 
     if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
@@ -72,7 +74,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
     }
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
-
     _serverMetrics = serverMetrics;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 29b293cbaa..6e8b195bbc 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
@@ -57,10 +58,11 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   private final GenericRow _reuse = new GenericRow();
 
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
-      List<String> primaryKeyColumns, List<String> comparisonColumns, HashFunction hashFunction,
-      @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
-    super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, hashFunction, partialUpsertHandler,
-        enableSnapshot, serverMetrics);
+      List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
+      HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
+      ServerMetrics serverMetrics) {
+    super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction,
+        partialUpsertHandler, enableSnapshot, serverMetrics);
   }
 
   @Override
@@ -70,29 +72,31 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
 
   @Override
   protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
-      Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
-      @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
+      @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
     String segmentName = segment.getSegmentName();
-    segment.enableUpsert(this, validDocIds);
+    segment.enableUpsert(this, validDocIds, queryableDocIds);
 
     AtomicInteger numKeysInWrongSegment = new AtomicInteger();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
+      int newDocId = recordInfo.getDocId();
+      Comparable newComparisonValue = recordInfo.getComparisonValue();
       _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
           (primaryKey, currentRecordLocation) -> {
             if (currentRecordLocation != null) {
               // Existing primary key
               IndexSegment currentSegment = currentRecordLocation.getSegment();
-              int comparisonResult =
-                  recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue());
+              int currentDocId = currentRecordLocation.getDocId();
+              int comparisonResult = newComparisonValue.compareTo(currentRecordLocation.getComparisonValue());
 
               // The current record is in the same segment
               // Update the record location when there is a tie to keep the newer record. Note that the record info
               // iterator will return records with incremental doc ids.
               if (currentSegment == segment) {
                 if (comparisonResult >= 0) {
-                  validDocIds.replace(currentRecordLocation.getDocId(), recordInfo.getDocId());
-                  return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+                  replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo);
+                  return new RecordLocation(segment, newDocId, newComparisonValue);
                 } else {
                   return currentRecordLocation;
                 }
@@ -106,11 +110,11 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
               // snapshot for the old segment, which can be updated and used to track the docs not replaced yet.
               if (currentSegment == oldSegment) {
                 if (comparisonResult >= 0) {
-                  validDocIds.add(recordInfo.getDocId());
+                  addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
                   if (validDocIdsForOldSegment != null) {
-                    validDocIdsForOldSegment.remove(currentRecordLocation.getDocId());
+                    validDocIdsForOldSegment.remove(currentDocId);
                   }
-                  return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+                  return new RecordLocation(segment, newDocId, newComparisonValue);
                 } else {
                   return currentRecordLocation;
                 }
@@ -122,8 +126,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
               if (currentSegmentName.equals(segmentName)) {
                 numKeysInWrongSegment.getAndIncrement();
                 if (comparisonResult >= 0) {
-                  validDocIds.add(recordInfo.getDocId());
-                  return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+                  addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+                  return new RecordLocation(segment, newDocId, newComparisonValue);
                 } else {
                   return currentRecordLocation;
                 }
@@ -137,16 +141,16 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
                   segmentName) && LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
                   && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
                   currentSegmentName))) {
-                Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentRecordLocation.getDocId());
-                validDocIds.add(recordInfo.getDocId());
-                return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+                removeDocId(currentSegment, currentDocId);
+                addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+                return new RecordLocation(segment, newDocId, newComparisonValue);
               } else {
                 return currentRecordLocation;
               }
             } else {
               // New primary key
-              validDocIds.add(recordInfo.getDocId());
-              return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+              addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+              return new RecordLocation(segment, newDocId, newComparisonValue);
             }
           });
     }
@@ -157,6 +161,34 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
     }
   }
 
+  private static void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
+    validDocIds.replace(oldDocId, newDocId);
+    if (queryableDocIds != null) {
+      if (recordInfo.isDeleteRecord()) {
+        queryableDocIds.remove(oldDocId);
+      } else {
+        queryableDocIds.replace(oldDocId, newDocId);
+      }
+    }
+  }
+
+  private static void addDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) {
+    validDocIds.add(docId);
+    if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
+      queryableDocIds.add(docId);
+    }
+  }
+
+  private static void removeDocId(IndexSegment segment, int docId) {
+    Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
+    ThreadSafeMutableRoaringBitmap currentQueryableDocIds = segment.getQueryableDocIds();
+    if (currentQueryableDocIds != null) {
+      currentQueryableDocIds.remove(docId);
+    }
+  }
+
   @Override
   protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
     assert !validDocIds.isEmpty();
@@ -184,6 +216,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   @Override
   protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
     ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
+    ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
+    int newDocId = recordInfo.getDocId();
+    Comparable newComparisonValue = recordInfo.getComparisonValue();
     _primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
         (primaryKey, currentRecordLocation) -> {
           if (currentRecordLocation != null) {
@@ -191,24 +226,24 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
 
             // Update the record location when the new comparison value is greater than or equal to the current value.
             // Update the record location when there is a tie to keep the newer record.
-            if (recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
+            if (newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
               IndexSegment currentSegment = currentRecordLocation.getSegment();
               int currentDocId = currentRecordLocation.getDocId();
               if (segment == currentSegment) {
-                validDocIds.replace(currentDocId, recordInfo.getDocId());
+                replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo);
               } else {
-                Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentDocId);
-                validDocIds.add(recordInfo.getDocId());
+                removeDocId(currentSegment, currentDocId);
+                addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
               }
-              return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+              return new RecordLocation(segment, newDocId, newComparisonValue);
             } else {
               handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
               return currentRecordLocation;
             }
           } else {
             // New primary key
-            validDocIds.add(recordInfo.getDocId());
-            return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+            addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+            return new RecordLocation(segment, newDocId, newComparisonValue);
           }
         });
 
@@ -221,18 +256,32 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
     assert _partialUpsertHandler != null;
     AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>();
+    AtomicBoolean outOfOrder = new AtomicBoolean();
     RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent(
         HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> {
           if (recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) {
-            _reuse.clear();
-            previousRecordReference.set(recordLocation.getSegment().getRecord(recordLocation.getDocId(), _reuse));
+            if (!recordInfo.isDeleteRecord()) {
+              IndexSegment currentSegment = recordLocation.getSegment();
+              int currentDocId = recordLocation.getDocId();
+              ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds();
+              if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
+                // if delete is not enabled or previous record not marked as deleted
+                _reuse.clear();
+                previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
+              }
+            }
+          } else {
+            outOfOrder.set(true);
           }
           return recordLocation;
         });
     if (currentRecordLocation != null) {
       // Existing primary key
-      GenericRow previousRecord = previousRecordReference.get();
-      if (previousRecord != null) {
+      if (!outOfOrder.get()) {
+        GenericRow previousRecord = previousRecordReference.get();
+        if (previousRecord == null) {
+          return record;
+        }
         return _partialUpsertHandler.merge(previousRecord, record);
       } else {
         handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index cfc6e529a4..b08ea591d1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,7 +36,8 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
   public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
         k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
-            _comparisonColumns, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
+            _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot,
+            _serverMetrics));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
index f4f139f6c3..4df31f27c0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
@@ -26,11 +26,13 @@ public class RecordInfo {
   private final PrimaryKey _primaryKey;
   private final int _docId;
   private final Comparable _comparisonValue;
+  private final boolean _deleteRecord;
 
-  public RecordInfo(PrimaryKey primaryKey, int docId, Comparable comparisonValue) {
+  public RecordInfo(PrimaryKey primaryKey, int docId, Comparable comparisonValue, boolean deleteRecord) {
     _primaryKey = primaryKey;
     _docId = docId;
     _comparisonValue = comparisonValue;
+    _deleteRecord = deleteRecord;
   }
 
   public PrimaryKey getPrimaryKey() {
@@ -44,4 +46,8 @@ public class RecordInfo {
   public Comparable getComparisonValue() {
     return _comparisonValue;
   }
+
+  public boolean isDeleteRecord() {
+    return _deleteRecord;
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 6a7d3ed626..8dd1d8c475 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -23,9 +23,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -75,31 +77,32 @@ public class UpsertUtils {
     };
   }
 
-  public static RecordInfoReader makeRecordReader(IndexSegment segment, List<String> primaryKeyColumns,
-      List<String> comparisonColumns) {
-    if (comparisonColumns.size() > 1) {
-      return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns);
-    }
-    return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns.get(0));
-  }
-
   public static class RecordInfoReader implements Closeable {
-    public final PrimaryKeyReader _primaryKeyReader;
-    public final ComparisonColumnReader _comparisonColumnReader;
+    private final PrimaryKeyReader _primaryKeyReader;
+    private final ComparisonColumnReader _comparisonColumnReader;
+    private final PinotSegmentColumnReader _deleteRecordColumnReader;
 
-    public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, List<String> comparisonColumns) {
+    public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, List<String> comparisonColumns,
+        @Nullable String deleteRecordColumn) {
       _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
-      _comparisonColumnReader = new MultiComparisonColumnReader(segment, comparisonColumns);
-    }
-
-    public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, String comparisonColumn) {
-      _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
-      _comparisonColumnReader = new SingleComparisonColumnReader(segment, comparisonColumn);
+      if (comparisonColumns.size() == 1) {
+        _comparisonColumnReader = new SingleComparisonColumnReader(segment, comparisonColumns.get(0));
+      } else {
+        _comparisonColumnReader = new MultiComparisonColumnReader(segment, comparisonColumns);
+      }
+      if (deleteRecordColumn != null) {
+        _deleteRecordColumnReader = new PinotSegmentColumnReader(segment, deleteRecordColumn);
+      } else {
+        _deleteRecordColumnReader = null;
+      }
     }
 
     public RecordInfo getRecordInfo(int docId) {
       PrimaryKey primaryKey = _primaryKeyReader.getPrimaryKey(docId);
-      return new RecordInfo(primaryKey, docId, _comparisonColumnReader.getComparisonValue(docId));
+      Comparable comparisonValue = _comparisonColumnReader.getComparisonValue(docId);
+      boolean deleteRecord = _deleteRecordColumnReader != null
+          && BooleanUtils.toBoolean(_deleteRecordColumnReader.getValue(docId));
+      return new RecordInfo(primaryKey, docId, comparisonValue, deleteRecord);
     }
 
     @Override
@@ -151,7 +154,6 @@ public class UpsertUtils {
     return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
   }
 
-
   public interface ComparisonColumnReader extends Closeable {
     Comparable getComparisonValue(int docId);
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 9ad82bc23c..89cac72cac 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -570,20 +570,29 @@ public final class TableConfigUtils {
         "Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing");
 
     // specifically for upsert
-    if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
-
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+    if (upsertConfig != null) {
       // no startree index
       Preconditions.checkState(CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs())
               && !tableConfig.getIndexingConfig().isEnableDefaultStarTree(),
           "The upsert table cannot have star-tree index.");
 
       // comparison column exists
-      if (tableConfig.getUpsertConfig().getComparisonColumns() != null) {
-        List<String> comparisonCols = tableConfig.getUpsertConfig().getComparisonColumns();
-        for (String comparisonCol : comparisonCols) {
-          Preconditions.checkState(schema.hasColumn(comparisonCol), "The comparison column does not exist on schema");
+      List<String> comparisonColumns = upsertConfig.getComparisonColumns();
+      if (comparisonColumns != null) {
+        for (String column : comparisonColumns) {
+          Preconditions.checkState(schema.hasColumn(column), "The comparison column does not exist on schema");
         }
       }
+
+      // Delete record column exist and is a BOOLEAN field
+      String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
+      if (deleteRecordColumn != null) {
+        FieldSpec fieldSpec = schema.getFieldSpecFor(deleteRecordColumn);
+        Preconditions.checkState(
+            fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN,
+            "The delete record column must be a single-valued BOOLEAN column");
+      }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
   }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
index c1c90fae97..0fed3d59dd 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
-import org.apache.pinot.segment.local.upsert.RecordInfo;
 import org.apache.pinot.segment.local.utils.HashUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.config.table.HashFunction;
@@ -122,20 +121,16 @@ public class PartitionDedupMetadataManagerTest {
     metadataManager.addSegment(segment1);
 
     // Same PK exists
-    RecordInfo recordInfo = mock(RecordInfo.class);
-    when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(0));
     ImmutableSegmentImpl segment2 = mockSegment(2);
-    Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+    Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(0), segment2));
     checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
 
     // New PK
-    when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
-    Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+    Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(3), segment2));
     checkRecordLocation(recordLocationMap, 3, segment2, hashFunction);
 
     // Same PK as the one recently ingested
-    when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
-    Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+    Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(3), segment2));
   }
 
   private static ImmutableSegmentImpl mockSegment(int sequenceNumber) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index a89f7e9d64..02fa90abdb 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
@@ -44,6 +45,7 @@ import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -76,7 +78,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
 
@@ -86,7 +88,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
     List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
-    ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, primaryKeys1);
+    ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, null, primaryKeys1);
     List<RecordInfo> recordInfoList1;
     if (enableSnapshot) {
       // get recordInfo from validDocIdSnapshot.
@@ -94,12 +96,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
       int[] docIds1 = new int[]{2, 4, 5};
       MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
       validDocIdsSnapshot1.add(docIds1);
-      recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps);
+      recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps, null);
     } else {
       // get recordInfo by iterating all records.
-      recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+      recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, null);
     }
-    upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator());
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, recordInfoList1.iterator());
     trackedSegments.add(segment1);
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
     assertEquals(recordLocationMap.size(), 3);
@@ -113,7 +115,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     primaryKeys = new int[]{0, 1, 2, 3, 0};
     timestamps = new int[]{100, 100, 120, 80, 80};
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
-    ImmutableSegmentImpl segment2 = mockImmutableSegment(2, validDocIds2, getPrimaryKeyList(numRecords, primaryKeys));
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys));
     List<RecordInfo> recordInfoList2;
     if (enableSnapshot) {
       // get recordInfo from validDocIdSnapshot.
@@ -121,12 +124,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
       // segment1 snapshot: 1 -> {4, 120}
       MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
       validDocIdsSnapshot2.add(0, 2, 3);
-      recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps);
+      recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps, null);
     } else {
       // get recordInfo by iterating all records.
-      recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+      recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, null);
     }
-    upsertMetadataManager.addSegment(segment2, validDocIds2, recordInfoList2.iterator());
+    upsertMetadataManager.addSegment(segment2, validDocIds2, null, recordInfoList2.iterator());
     trackedSegments.add(segment2);
 
     // segment1: 1 -> {4, 120}
@@ -154,8 +157,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     // Replace (reload) the first segment
     ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap();
-    ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, primaryKeys1);
-    upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator(), segment1);
+    ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, null, primaryKeys1);
+    upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, null, recordInfoList1.iterator(), segment1);
     trackedSegments.add(newSegment1);
     trackedSegments.remove(segment1);
     // original segment1: 1 -> {4, 120} (not in the map)
@@ -220,10 +223,198 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
-  private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
+  @Test
+  public void testAddReplaceRemoveSegmentWithRecordDelete()
+      throws IOException {
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE, false);
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5, false);
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, false);
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE, true);
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5, true);
+    verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true);
+  }
+
+  private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction hashFunction, boolean enableSnapshot)
+      throws IOException {
+    String comparisonColumn = "timeCol";
+    String deleteRecordColumn = "deleteCol";
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false,
+            mock(ServerMetrics.class));
+    Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    // Add the first segment
+    int numRecords = 6;
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+    boolean[] deleteFlags = new boolean[]{false, false, false, true, true, false};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, queryableDocIds1, primaryKeys1);
+    List<RecordInfo> recordInfoList1;
+    if (enableSnapshot) {
+      // get recordInfo from validDocIdSnapshot.
+      // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+      int[] docIds1 = new int[]{2, 4, 5};
+      MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+      validDocIdsSnapshot1.add(docIds1);
+      recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps, deleteFlags);
+    } else {
+      // get recordInfo by iterating all records.
+      recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, deleteFlags);
+    }
+    upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, recordInfoList1.iterator());
+    trackedSegments.add(segment1);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 5});
+
+    // Add the second segment
+    numRecords = 5;
+    primaryKeys = new int[]{0, 1, 2, 3, 0};
+    timestamps = new int[]{100, 100, 120, 80, 80};
+    deleteFlags = new boolean[]{false, true, true, false, false};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegment(2, validDocIds2, queryableDocIds2, getPrimaryKeyList(numRecords, primaryKeys));
+    List<RecordInfo> recordInfoList2;
+    if (enableSnapshot) {
+      // get recordInfo from validDocIdSnapshot.
+      // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+      // segment1 snapshot: 1 -> {4, 120}
+      MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+      validDocIdsSnapshot2.add(0, 2, 3);
+      recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps, deleteFlags);
+    } else {
+      // get recordInfo by iterating all records.
+      recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, deleteFlags);
+    }
+    upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2, recordInfoList2.iterator());
+    trackedSegments.add(segment2);
+
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+
+    // Add an empty segment
+    EmptyIndexSegment emptySegment = mockEmptySegment(3);
+    upsertMetadataManager.addSegment(emptySegment);
+    // segment1: 1 -> {4, 120}
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+
+    // Replace (reload) the first segment
+    ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap newQueryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, newQueryableDocIds1, primaryKeys1);
+    upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, newQueryableDocIds1, recordInfoList1.iterator(),
+        segment1);
+    trackedSegments.add(newSegment1);
+    trackedSegments.remove(segment1);
+
+    // original segment1: 1 -> {4, 120} (not in the map)
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Remove the original segment1
+    upsertMetadataManager.removeSegment(segment1);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Remove the empty segment
+    upsertMetadataManager.removeSegment(emptySegment);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Remove segment2
+    upsertMetadataManager.removeSegment(segment2);
+    // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(trackedSegments, Collections.singleton(newSegment1));
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Remove new segment1, should be no-op
+    upsertMetadataManager.removeSegment(newSegment1);
+    // new segment1: 1 -> {4, 120}
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+    assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+    assertEquals(trackedSegments, Collections.singleton(newSegment1));
+    Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps,
+      @Nullable boolean[] deleteRecordFlags) {
     List<RecordInfo> recordInfoList = new ArrayList<>();
     for (int i = 0; i < numRecords; i++) {
-      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
+      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i]),
+          deleteRecordFlags != null && deleteRecordFlags[i]));
     }
     return recordInfoList;
   }
@@ -232,11 +423,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
    * Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
    */
   private List<RecordInfo> getRecordInfoList(MutableRoaringBitmap validDocIdsSnapshot, int[] primaryKeys,
-      int[] timestamps) {
+      int[] timestamps, @Nullable boolean[] deleteRecordFlags) {
     List<RecordInfo> recordInfoList = new ArrayList<>();
     Iterator<Integer> validDocIdsIterator = validDocIdsSnapshot.iterator();
     validDocIdsIterator.forEachRemaining((docId) -> recordInfoList.add(
-        new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId, new IntWrapper(timestamps[docId]))));
+        new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId, new IntWrapper(timestamps[docId]),
+            deleteRecordFlags != null && deleteRecordFlags[docId])));
     return recordInfoList;
   }
 
@@ -249,10 +441,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   }
 
   private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
-      ThreadSafeMutableRoaringBitmap validDocIds, List<PrimaryKey> primaryKeys) {
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys) {
     ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
     when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
     when(segment.getValidDocIds()).thenReturn(validDocIds);
+    when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
     DataSource dataSource = mock(DataSource.class);
     when(segment.getDataSource(anyString())).thenReturn(dataSource);
     ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
@@ -270,9 +464,11 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     return new EmptyIndexSegment(segmentMetadata);
   }
 
-  private static MutableSegment mockMutableSegment(int sequenceNumber, ThreadSafeMutableRoaringBitmap validDocIds) {
+  private static MutableSegment mockMutableSegment(int sequenceNumber, ThreadSafeMutableRoaringBitmap validDocIds,
+      ThreadSafeMutableRoaringBitmap queryableDocIds) {
     MutableSegment segment = mock(MutableSegment.class);
     when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+    when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
     when(segment.getValidDocIds()).thenReturn(validDocIds);
     return segment;
   }
@@ -308,7 +504,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -317,14 +513,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     int[] primaryKeys = new int[]{0, 1, 2};
     int[] timestamps = new int[]{100, 120, 100};
     ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
-    ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
-    upsertMetadataManager.addSegment(segment1, validDocIds1,
-        getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());
 
     // Update records from the second segment
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
-    MutableSegment segment2 = mockMutableSegment(1, validDocIds2);
-    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100)));
+    MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
@@ -335,7 +532,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
 
-    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120)));
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120), false));
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
@@ -346,7 +543,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
-    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(100)));
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(100), false));
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
@@ -357,7 +554,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
-    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 3, new IntWrapper(100)));
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 3, new IntWrapper(100), false));
 
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
@@ -372,7 +569,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.stop();
 
     // Add record should be no-op
-    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120)));
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120), false));
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
@@ -386,6 +583,115 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
+  @Test
+  public void testAddRecordWithDeleteColumn()
+      throws IOException {
+    verifyAddRecordWithDeleteColumn(HashFunction.NONE);
+    verifyAddRecordWithDeleteColumn(HashFunction.MD5);
+    verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
+  }
+  private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
+      throws IOException {
+    String comparisonColumn = "timeCol";
+    String deleteColumn = "deleteCol";
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+            Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
+            false, mock(ServerMetrics.class));
+    Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // queryableDocIds is same as validDocIds in the absence of delete markers
+    // Add the first segment
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{0, 1, 2};
+    int[] timestamps = new int[]{100, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
+    upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
+        getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());
+
+    // Update records from the second segment
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    ThreadSafeMutableRoaringBitmap queryableDocIds2 = new ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment2 = mockMutableSegment(1, validDocIds2, queryableDocIds2);
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+    // segment2: 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+
+    // Mark a record with latest value in segment1 as deleted
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120), true));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+
+    // Mark a record with latest value in segment2 as deleted
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 2, new IntWrapper(150), true));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {2, 150}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 150, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 2});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{});
+
+    // Revive a deleted primary key (by providing a larger comparisonValue)
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 3, new IntWrapper(200), false));
+
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {3, 200}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 3});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3});
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Add record should be no-op
+    upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120), false));
+    // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+    // segment2: 2 -> {1, 120}, 3 -> {3, 200}
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 3});
+    assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+    assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3});
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
   @Test
   public void testHashPrimaryKey() {
     PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 138ae2db1a..a848fd1247 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1415,7 +1415,8 @@ public class TableConfigUtilsTest {
   @Test
   public void testValidateUpsertConfig() {
     Schema schema =
-        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+            .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
             .build();
     UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     TableConfig tableConfig =
@@ -1521,6 +1522,43 @@ public class TableConfigUtilsTest {
       Assert.assertEquals(e.getMessage(),
           "Metrics aggregation cannot be enabled in the Indexing Config and Ingestion Config at the same time");
     }
+
+    // Table upsert with delete column
+    String incorrectTypeDelCol = "incorrectTypeDeleteCol";
+    String delCol = "myDelCol";
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension(incorrectTypeDelCol, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN)
+        .build();
+    streamConfigs = getStreamConfigs();
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDeleteRecordColumn(incorrectTypeDelCol);
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+      Assert.fail("Invalid delete column type (string) should have failed table creation");
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "The delete record column must be a single-valued BOOLEAN column");
+    }
+
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDeleteRecordColumn(delCol);
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.fail("Shouldn't fail table creation when delete column type is boolean.");
+    }
   }
 
   @Test
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
index de200e6be9..9c48a00efc 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
@@ -72,10 +72,21 @@ public interface IndexSegment {
    */
   List<StarTreeV2> getStarTrees();
 
+  /**
+   * Returns a bitmap of the valid document ids. Valid document is the document that holds the latest timestamp (or
+   * largest comparison value) for the primary key for upsert enabled tables.
+   */
   // TODO(upsert): solve the coordination problems of getting validDoc across segments for result consistency
   @Nullable
   ThreadSafeMutableRoaringBitmap getValidDocIds();
 
+  /**
+   * Returns a bitmap of the queryable document ids. Queryable document is the document that holds the latest timestamp
+   * (or largest comparison value) for the primary key and is not deleted for upsert enabled tables.
+   */
+  @Nullable
+  ThreadSafeMutableRoaringBitmap getQueryableDocIds();
+
   /**
    * Returns the record for the given document id. Virtual column values are not returned.
    * <p>NOTE: don't use this method for high performance code. Use PinotSegmentRecordReader when reading multiple
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 94983e6267..bb912dd9e0 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -293,7 +293,7 @@ public class TablesResourceTest extends BaseResourceTest {
     for (int docId: docIds) {
       validDocIds.add(docId);
     }
-    segment.enableUpsert(upsertMetadataManager, validDocIds);
+    segment.enableUpsert(upsertMetadataManager, validDocIds, null);
 
     // Download the snapshot in byte[] format.
     Response response = _webTarget.path(snapshotPath).request().get(Response.class);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 41a5bfeb71..f16d2e67f9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -388,6 +388,12 @@ public class TableConfig extends BaseJsonConfig {
     return _upsertConfig == null ? null : _upsertConfig.getComparisonColumns();
   }
 
+  @JsonIgnore
+  @Nullable
+  public String getUpsertDeleteRecordColumn() {
+    return _upsertConfig == null ? null : _upsertConfig.getDeleteRecordColumn();
+  }
+
   @JsonProperty(TUNER_CONFIG_LIST_KEY)
   public List<TunerConfig> getTunerConfigsList() {
     return _tunerConfigList;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index f4ce363d00..c589b73a01 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -54,6 +54,9 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Columns for upsert comparison, default to time column")
   private List<String> _comparisonColumns;
 
+  @JsonPropertyDescription("Boolean column to indicate whether a records should be deleted")
+  private String _deleteRecordColumn;
+
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
   private boolean _enableSnapshot;
 
@@ -96,6 +99,11 @@ public class UpsertConfig extends BaseJsonConfig {
     return _comparisonColumns;
   }
 
+  @Nullable
+  public String getDeleteRecordColumn() {
+    return _deleteRecordColumn;
+  }
+
   public boolean isEnableSnapshot() {
     return _enableSnapshot;
   }
@@ -154,6 +162,12 @@ public class UpsertConfig extends BaseJsonConfig {
     }
   }
 
+  public void setDeleteRecordColumn(String deleteRecordColumn) {
+    if (deleteRecordColumn != null) {
+      _deleteRecordColumn = deleteRecordColumn;
+    }
+  }
+
   public void setEnableSnapshot(boolean enableSnapshot) {
     _enableSnapshot = enableSnapshot;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org