You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/26 17:11:52 UTC

[pinot] branch master updated: Segment compaction for upsert real-time tables (#10463)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d72eb5a58 Segment compaction for upsert real-time tables (#10463)
4d72eb5a58 is described below

commit 4d72eb5a58ed7c9874076d9d488afb1a81eeb54f
Author: Robert Zych <ro...@gmail.com>
AuthorDate: Wed Jul 26 10:11:45 2023 -0700

    Segment compaction for upsert real-time tables (#10463)
---
 .../pinot/controller/BaseControllerStarter.java    |  31 +--
 .../helix/core/minion/ClusterInfoAccessor.java     |  36 ++-
 .../helix/core/minion/PinotTaskManager.java        |  11 +-
 .../apache/pinot/core/common/MinionConstants.java  |  19 ++
 ...sertCompactionMinionClusterIntegrationTest.java | 228 +++++++++++++++++
 .../test/resources/upsert_compaction_test.tar.gz   | Bin 0 -> 9405 bytes
 .../org/apache/pinot/minion/BaseMinionStarter.java |  17 +-
 .../org/apache/pinot/minion/MinionContext.java     |  10 +
 .../tasks/BaseSingleSegmentConversionExecutor.java |   9 +-
 .../UpsertCompactionTaskExecutor.java              | 248 ++++++++++++++++++
 .../UpsertCompactionTaskExecutorFactory.java       |  49 ++++
 .../UpsertCompactionTaskGenerator.java             | 276 +++++++++++++++++++++
 ...psertCompactionTaskProgressObserverFactory.java |  33 +++
 .../UpsertCompactionTaskExecutorTest.java          |  63 +++++
 .../UpsertCompactionTaskGeneratorTest.java         | 273 ++++++++++++++++++++
 .../segment/local/utils/TableConfigUtils.java      |  69 ++++--
 .../segment/local/utils/TableConfigUtilsTest.java  | 136 ++++++----
 .../pinot/server/api/resources/TablesResource.java |  96 +++++--
 .../pinot/server/api/TablesResourceTest.java       |  81 +++---
 19 files changed, 1531 insertions(+), 154 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index cc294a5341..e32a419ecf 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -245,19 +245,19 @@ public abstract class BaseControllerStarter implements ServiceStartable {
     // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
     // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
     // non-positive value, so set the default value as 1.
-    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _config
-        .getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS,
+    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
+        _config.getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS,
             CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
   }
 
   private void setupHelixClusterConstraints() {
-    String maxStateTransitions = _config
-        .getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS,
+    String maxStateTransitions =
+        _config.getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS,
             CommonConstants.Helix.DEFAULT_HELIX_INSTANCE_MAX_STATE_TRANSITIONS);
     Map<ClusterConstraints.ConstraintAttribute, String> constraintAttributes = new HashMap<>();
     constraintAttributes.put(ClusterConstraints.ConstraintAttribute.INSTANCE, ".*");
-    constraintAttributes
-        .put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE, Message.MessageType.STATE_TRANSITION.name());
+    constraintAttributes.put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE,
+        Message.MessageType.STATE_TRANSITION.name());
     ConstraintItem constraintItem = new ConstraintItem(constraintAttributes, maxStateTransitions);
 
     _helixControllerManager.getClusterManagmentTool()
@@ -371,8 +371,8 @@ public abstract class BaseControllerStarter implements ServiceStartable {
   private void setUpPinotController() {
     // install default SSL context if necessary (even if not force-enabled everywhere)
     TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, ControllerConf.CONTROLLER_TLS_PREFIX);
-    if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils
-        .isNotBlank(tlsDefaults.getTrustStorePath())) {
+    if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
+        tlsDefaults.getTrustStorePath())) {
       LOGGER.info("Installing default SSL context for any client requests");
       TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
     }
@@ -392,8 +392,9 @@ public abstract class BaseControllerStarter implements ServiceStartable {
         _config.getProperty(CommonConstants.Controller.CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES));
 
     LOGGER.info("Initializing Helix participant manager");
-    _helixParticipantManager = HelixManagerFactory
-        .getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT, _helixZkURL);
+    _helixParticipantManager =
+        HelixManagerFactory.getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT,
+            _helixZkURL);
 
     // LeadControllerManager needs to be initialized before registering as Helix participant.
     LOGGER.info("Initializing lead controller manager");
@@ -502,8 +503,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
 
-    _controllerMetrics.addCallbackGauge("dataDir.exists",
-        () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
+    _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
     _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
       File dataDir = new File(_config.getDataDir());
       if (dataDir.exists()) {
@@ -673,7 +673,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
     _taskManagerStatusCache = getTaskManagerStatusCache();
     _taskManager =
         new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
-            _controllerMetrics, _taskManagerStatusCache);
+            _controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager);
     periodicTasks.add(_taskManager);
     _retentionManager =
         new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
@@ -693,8 +693,9 @@ public abstract class BaseControllerStarter implements ServiceStartable {
         new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
             _executorService);
     periodicTasks.add(_segmentStatusChecker);
-    _realtimeConsumerMonitor = new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager,
-        _controllerMetrics, _executorService);
+    _realtimeConsumerMonitor =
+        new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics,
+            _executorService);
     periodicTasks.add(_realtimeConsumerMonitor);
     _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
         _executorService, _connectionManager);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index bcb1491988..5551c04d7c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -21,7 +21,9 @@ package org.apache.pinot.controller.helix.core.minion;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.task.TaskState;
@@ -51,15 +53,20 @@ public class ClusterInfoAccessor {
   private final ControllerConf _controllerConf;
   private final ControllerMetrics _controllerMetrics;
   private final LeadControllerManager _leadControllerManager;
+  private final Executor _executor;
+  private final MultiThreadedHttpConnectionManager _connectionManager;
 
   public ClusterInfoAccessor(PinotHelixResourceManager pinotHelixResourceManager,
       PinotHelixTaskResourceManager pinotHelixTaskResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) {
+      ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, Executor executor,
+      MultiThreadedHttpConnectionManager connectionManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _pinotHelixTaskResourceManager = pinotHelixTaskResourceManager;
     _controllerConf = controllerConf;
     _controllerMetrics = controllerMetrics;
     _leadControllerManager = leadControllerManager;
+    _executor = executor;
+    _connectionManager = connectionManager;
   }
 
   /**
@@ -94,6 +101,20 @@ public class ClusterInfoAccessor {
     return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
   }
 
+  /**
+   * Get shared executor
+   */
+  public Executor getExecutor() {
+    return _executor;
+  }
+
+  /**
+   * Get shared connection manager
+   */
+  public MultiThreadedHttpConnectionManager getConnectionManager() {
+    return _connectionManager;
+  }
+
   /**
    * Fetches the ZNRecord under MINION_TASK_METADATA/${tableNameWithType}/${taskType} for the given
    * taskType and tableNameWithType
@@ -114,8 +135,8 @@ public class ClusterInfoAccessor {
    */
   @Nullable
   public SegmentLineage getSegmentLineage(String tableNameWithType) {
-    return SegmentLineageAccessHelper
-        .getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
+    return SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
+        tableNameWithType);
   }
 
   /**
@@ -127,8 +148,8 @@ public class ClusterInfoAccessor {
    * @param expectedVersion The expected version of data to be overwritten. Set to -1 to override version check.
    */
   public void setMinionTaskMetadata(BaseTaskMetadata taskMetadata, String taskType, int expectedVersion) {
-    MinionTaskMetadataUtils
-        .persistTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType, taskMetadata, expectedVersion);
+    MinionTaskMetadataUtils.persistTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType, taskMetadata,
+        expectedVersion);
   }
 
   /**
@@ -175,8 +196,9 @@ public class ClusterInfoAccessor {
    * @return cluster config
    */
   public String getClusterConfig(String configName) {
-    HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
-        .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
+    HelixConfigScope helixConfigScope =
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+            _pinotHelixResourceManager.getHelixClusterName()).build();
     Map<String, String> configMap =
         _pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Collections.singletonList(configName));
     return configMap != null ? configMap.get(configName) : null;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index e7a7584b76..4f10b6a5c5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -32,8 +32,10 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.helix.AccessOption;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
@@ -114,7 +116,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
       PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
       ControllerConf controllerConf, ControllerMetrics controllerMetrics,
-      TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache) {
+      TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache, Executor executor,
+      MultiThreadedHttpConnectionManager connectionManager) {
     super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
         controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
         controllerMetrics);
@@ -122,7 +125,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
     _taskManagerStatusCache = taskManagerStatusCache;
     _clusterInfoAccessor =
         new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics,
-            leadControllerManager);
+            leadControllerManager, executor, connectionManager);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
     _skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
     _maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds();
@@ -561,8 +564,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
       long successRunTimestamp = System.currentTimeMillis();
       for (TableConfig tableConfig : enabledTableConfigs) {
         _taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), taskGenerator.getTaskType(),
-            taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(
-                successRunTimestamp, errors.toString()));
+            taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(successRunTimestamp,
+                errors.toString()));
         // before the first task schedule, the follow gauge metric will be empty
         // TODO: find a better way to report task generation information
         _controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 8e4c5cfc1c..25c5427cf6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -139,4 +139,23 @@ public class MinionConstants {
     public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
         "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
   }
+
+  public static class UpsertCompactionTask {
+    public static final String TASK_TYPE = "UpsertCompactionTask";
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
+     */
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+    /**
+     * The maximum percent of old records allowed for a completed segment.
+     * e.g. if the percent surpasses 30, then the segment may be compacted
+     */
+    public static final String INVALID_RECORDS_THRESHOLD_PERCENT = "invalidRecordsThresholdPercent";
+    /**
+     * The maximum count of old records for a completed segment
+     * e.g. if the count surpasses 100k, then the segment may be compacted
+     */
+    public static final String INVALID_RECORDS_THRESHOLD_COUNT = "invalidRecordsThresholdCount";
+  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
new file mode 100644
index 0000000000..de645ca749
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+public class UpsertCompactionMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
+  protected PinotHelixTaskResourceManager _helixTaskResourceManager;
+  protected PinotTaskManager _taskManager;
+  private static final String PRIMARY_KEY_COL = "clientId";
+  private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
+  private static List<File> _avroFiles;
+  private TableConfig _tableConfig;
+  private Schema _schema;
+
+  @Override
+  protected String getSchemaFileName() {
+    return "upsert_upload_segment_test.schema";
+  }
+
+  @Override
+  protected String getSchemaName() {
+    return "upsertSchema";
+  }
+
+  @Override
+  protected String getAvroTarFileName() {
+    return "upsert_compaction_test.tar.gz";
+  }
+
+  @Override
+  protected String getPartitionColumn() {
+    return PRIMARY_KEY_COL;
+  }
+
+  private TableTaskConfig getCompactionTaskConfig() {
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "0d");
+    tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, "1");
+    tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT, "10");
+    return new TableTaskConfig(
+        Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE, tableTaskConfigs));
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServers(1);
+
+    // Unpack the Avro files
+    _avroFiles = unpackAvroData(_tempDir);
+
+    startKafka();
+
+    // Create and upload the schema and table config
+    _schema = createSchema();
+    addSchema(_schema);
+    _tableConfig = createUpsertTableConfig(_avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
+    _tableConfig.setTaskConfig(getCompactionTaskConfig());
+    addTableConfig(_tableConfig);
+
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, _tableConfig, _schema, 0, _segmentDir, _tarDir);
+
+    startMinion();
+    _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+    _taskManager = _controllerStarter.getTaskManager();
+  }
+
+  @BeforeMethod
+  public void beforeMethod()
+      throws Exception {
+    // Create and upload segments
+    uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
+  }
+
+  protected void waitForAllDocsLoaded(long timeoutMs, long expectedCount)
+      throws Exception {
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return getCurrentCountStarResultWithoutUpsert() == expectedCount;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, timeoutMs, "Failed to load all documents");
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
+  }
+
+  private long getCurrentCountStarResultWithoutUpsert() {
+    return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)")
+        .getResultSet(0).getLong(0);
+  }
+
+  private long getSalary() {
+    return getPinotConnection().execute("SELECT salary FROM " + getTableName() + " WHERE clientId=100001")
+        .getResultSet(0).getLong(0);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return 3;
+  }
+
+  @AfterMethod
+  public void afterMethod()
+      throws Exception {
+    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+
+    // Test dropping all segments one by one
+    List<String> segments = listSegments(realtimeTableName);
+    assertFalse(segments.isEmpty());
+    for (String segment : segments) {
+      dropSegment(realtimeTableName, segment);
+    }
+    // NOTE: There is a delay to remove the segment from property store
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        return listSegments(realtimeTableName).isEmpty();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }, 60_000L, "Failed to drop the segments");
+
+    stopServer();
+    startServers(1);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    dropRealtimeTable(realtimeTableName);
+    stopMinion();
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Test
+  public void testCompaction()
+      throws Exception {
+    waitForAllDocsLoaded(600_000L, 283);
+    assertEquals(getSalary(), 9747108);
+
+    assertNotNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+    waitForTaskToComplete();
+    waitForAllDocsLoaded(600_000L, 3);
+    assertEquals(getSalary(), 9747108);
+  }
+
+  @Test
+  public void testCompactionDeletesSegments()
+      throws Exception {
+    pushAvroIntoKafka(_avroFiles);
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L, 566);
+    assertEquals(getSalary(), 9747108);
+
+    assertNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
+    waitForTaskToComplete();
+    waitForAllDocsLoaded(600_000L, 283);
+    assertEquals(getSalary(), 9747108);
+  }
+
+  protected void waitForTaskToComplete() {
+    TestUtils.waitForCondition(input -> {
+      // Check task state
+      for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.UpsertCompactionTask.TASK_TYPE)
+          .values()) {
+        if (taskState != TaskState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }, 600_000L, "Failed to complete task");
+  }
+}
diff --git a/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz b/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz
new file mode 100644
index 0000000000..76b0a50126
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz differ
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index 845ec95e66..3d3bfcbe0b 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -124,8 +124,8 @@ public abstract class BaseMinionStarter implements ServiceStartable {
     // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
     // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
     // non-positive value, so set the default value as 1.
-    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _config
-        .getProperty(CommonConstants.Helix.CONFIG_OF_MINION_FLAPPING_TIME_WINDOW_MS,
+    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
+        _config.getProperty(CommonConstants.Helix.CONFIG_OF_MINION_FLAPPING_TIME_WINDOW_MS,
             CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
   }
 
@@ -174,8 +174,8 @@ public abstract class BaseMinionStarter implements ServiceStartable {
 
     // Initialize data directory
     LOGGER.info("Initializing data directory");
-    File dataDir = new File(_config
-        .getProperty(CommonConstants.Helix.Instance.DATA_DIR_KEY, CommonConstants.Minion.DEFAULT_INSTANCE_DATA_DIR));
+    File dataDir = new File(_config.getProperty(CommonConstants.Helix.Instance.DATA_DIR_KEY,
+        CommonConstants.Minion.DEFAULT_INSTANCE_DATA_DIR));
     if (dataDir.exists()) {
       FileUtils.cleanDirectory(dataDir);
     } else {
@@ -195,8 +195,8 @@ public abstract class BaseMinionStarter implements ServiceStartable {
 
     // Install default SSL context if necessary (even if not force-enabled everywhere)
     TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, CommonConstants.Minion.MINION_TLS_PREFIX);
-    if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils
-        .isNotBlank(tlsDefaults.getTrustStorePath())) {
+    if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
+        tlsDefaults.getTrustStorePath())) {
       LOGGER.info("Installing default SSL context for any client requests");
       TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
     }
@@ -237,8 +237,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {
     PinotConfiguration segmentUploaderConfig =
         _config.subset(CommonConstants.Minion.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER);
     if (segmentUploaderConfig.isEmpty()) {
-      segmentUploaderConfig =
-          _config.subset(CommonConstants.Minion.DEPRECATED_PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER);
+      segmentUploaderConfig = _config.subset(CommonConstants.Minion.DEPRECATED_PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER);
     }
     PinotConfiguration httpsConfig = segmentUploaderConfig.subset(CommonConstants.HTTPS_PROTOCOL);
     if (httpsConfig.getProperty(HTTPS_ENABLED, false)) {
@@ -254,7 +253,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {
     _helixManager.connect();
     updateInstanceConfigIfNeeded();
     minionContext.setHelixPropertyStore(_helixManager.getHelixPropertyStore());
-
+    minionContext.setHelixManager(_helixManager);
     LOGGER.info("Starting minion admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _minionAdminApplication = new MinionAdminApiApplication(_instanceId, _config);
     _minionAdminApplication.start(_listenerConfigs);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
index 6ccd364677..8b6e0dc4c9 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java
@@ -20,6 +20,7 @@ package org.apache.pinot.minion;
 
 import java.io.File;
 import javax.net.ssl.SSLContext;
+import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metrics.MinionMetrics;
@@ -43,6 +44,7 @@ public class MinionContext {
   private File _dataDir;
   private MinionMetrics _minionMetrics;
   private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
+  private HelixManager _helixManager;
 
   // For segment upload
   private SSLContext _sslContext;
@@ -107,4 +109,12 @@ public class MinionContext {
   public void setTaskAuthProvider(AuthProvider taskAuthProvider) {
     _taskAuthProvider = taskAuthProvider;
   }
+
+  public void setHelixManager(HelixManager helixManager) {
+    _helixManager = helixManager;
+  }
+
+  public HelixManager getHelixManager() {
+    return _helixManager;
+  }
 }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index 51c7f98543..001ce26d46 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -131,9 +131,12 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
           "Converted segment name: %s does not match original segment name: %s",
           segmentConversionResult.getSegmentName(), segmentName);
 
+      File convertedSegmentDir = segmentConversionResult.getFile();
+      if (convertedSegmentDir == null) {
+        return segmentConversionResult;
+      }
       // Tar the converted segment
       _eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: " + segmentName);
-      File convertedSegmentDir = segmentConversionResult.getFile();
       File convertedTarredSegmentFile =
           new File(tempDataDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
       TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedTarredSegmentFile);
@@ -184,8 +187,8 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
           TableNameBuilder.extractRawTableName(tableNameWithType));
       NameValuePair tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
           TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString());
-      List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter,
-          tableTypeParameter);
+      List<NameValuePair> parameters =
+          Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter, tableTypeParameter);
 
       // Upload the tarred segment
       _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + segmentName);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
new file mode 100644
index 0000000000..aa37ac871a
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
+import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);
+  private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager();
+  private static HelixAdmin _clusterManagementTool = _helixManager.getClusterManagmentTool();
+  private static String _clusterName = _helixManager.getClusterName();
+
+  private class CompactedRecordReader implements RecordReader {
+    private final PinotSegmentRecordReader _pinotSegmentRecordReader;
+    private final PeekableIntIterator _validDocIdsIterator;
+    // Reusable generic row to store the next row to return
+    GenericRow _nextRow = new GenericRow();
+    // Flag to mark whether we need to fetch another row
+    boolean _nextRowReturned = true;
+
+    CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) {
+      _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+      _pinotSegmentRecordReader.init(indexDir, null, null);
+      _validDocIdsIterator = validDocIds.getIntIterator();
+    }
+
+    @Override
+    public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) {
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (!_validDocIdsIterator.hasNext() && _nextRowReturned) {
+        return false;
+      }
+
+      // If next row has not been returned, return true
+      if (!_nextRowReturned) {
+        return true;
+      }
+
+      // Try to get the next row to return
+      if (_validDocIdsIterator.hasNext()) {
+        int docId = _validDocIdsIterator.next();
+        _nextRow.clear();
+        _pinotSegmentRecordReader.getRecord(docId, _nextRow);
+        _nextRowReturned = false;
+        return true;
+      }
+
+      // Cannot find next row to return, return false
+      return false;
+    }
+
+    @Override
+    public GenericRow next() {
+      return next(new GenericRow());
+    }
+
+    @Override
+    public GenericRow next(GenericRow reuse) {
+      Preconditions.checkState(!_nextRowReturned);
+      reuse.init(_nextRow);
+      _nextRowReturned = true;
+      return reuse;
+    }
+
+    @Override
+    public void rewind() {
+      _pinotSegmentRecordReader.rewind();
+      _nextRowReturned = true;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      _pinotSegmentRecordReader.close();
+    }
+  }
+
+  @Override
+  protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir)
+      throws Exception {
+    _eventObserver.notifyProgress(pinotTaskConfig, "Compacting segment: " + indexDir);
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+    String taskType = pinotTaskConfig.getTaskType();
+    LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+    long startMillis = System.currentTimeMillis();
+
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType, configs);
+
+    if (validDocIds.isEmpty()) {
+      // prevents empty segment generation
+      LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: {}", tableNameWithType, segmentName);
+      if (indexDir.exists() && !FileUtils.deleteQuietly(indexDir)) {
+        LOGGER.warn("Failed to delete input segment: {}", indexDir.getAbsolutePath());
+      }
+      if (!FileUtils.deleteQuietly(workingDir)) {
+        LOGGER.warn("Failed to delete working directory: {}", workingDir.getAbsolutePath());
+      }
+      return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+          .build();
+    }
+
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    try (CompactedRecordReader compactedRecordReader = new CompactedRecordReader(indexDir, validDocIds)) {
+      SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName);
+      SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+      driver.init(config, compactedRecordReader);
+      driver.build();
+    }
+
+    File compactedSegmentFile = new File(workingDir, segmentName);
+    SegmentConversionResult result =
+        new SegmentConversionResult.Builder().setFile(compactedSegmentFile).setTableNameWithType(tableNameWithType)
+            .setSegmentName(segmentName).build();
+
+    long endMillis = System.currentTimeMillis();
+    LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
+
+    return result;
+  }
+
+  private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig,
+      SegmentMetadataImpl segmentMetadata, String segmentName) {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, segmentMetadata.getSchema());
+    config.setOutDir(workingDir.getPath());
+    config.setSegmentName(segmentName);
+    // Keep index creation time the same as original segment because both segments use the same raw data.
+    // This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
+    // identify if the new pushed segment has newer data than the existing one.
+    config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime()));
+
+    // The time column type info is not stored in the segment metadata.
+    // Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
+    if (segmentMetadata.getTimeInterval() != null) {
+      config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName());
+      config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
+      config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
+      config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
+    }
+    return config;
+  }
+
+  // TODO: Consider moving this method to a more appropriate class (eg ServerSegmentMetadataReader)
+  private static ImmutableRoaringBitmap getValidDocIds(String tableNameWithType, Map<String, String> configs)
+      throws URISyntaxException {
+    String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
+    String server = getServer(segmentName, tableNameWithType);
+
+    // get the url for the validDocIds for the server
+    InstanceConfig instanceConfig = _clusterManagementTool.getInstanceConfig(_clusterName, server);
+    String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+    String url =
+        new URIBuilder(endpoint).setPath(String.format("/segments/%s/%s/validDocIds", tableNameWithType, segmentName))
+            .toString();
+
+    // get the validDocIds from that server
+    Response response = ClientBuilder.newClient().target(url).request().get(Response.class);
+    Preconditions.checkState(response.getStatus() == Response.Status.OK.getStatusCode(),
+        "Unable to retrieve validDocIds from %s", url);
+    byte[] snapshot = response.readEntity(byte[].class);
+    ImmutableRoaringBitmap validDocIds = new ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot));
+    return validDocIds;
+  }
+
+  @VisibleForTesting
+  public static String getServer(String segmentName, String tableNameWithType) {
+    ExternalView externalView = _clusterManagementTool.getResourceExternalView(_clusterName, tableNameWithType);
+    if (externalView == null) {
+      throw new IllegalStateException("External view does not exist for table: " + tableNameWithType);
+    }
+    Map<String, String> instanceStateMap = externalView.getStateMap(segmentName);
+    if (instanceStateMap == null) {
+      throw new IllegalStateException("Failed to find segment: " + segmentName);
+    }
+    for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
+      if (entry.getValue().equals(SegmentStateModel.ONLINE)) {
+        return entry.getKey();
+      }
+    }
+    throw new IllegalStateException("Failed to find ONLINE server for segment: " + segmentName);
+  }
+
+  @Override
+  protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
+      SegmentConversionResult segmentConversionResult) {
+    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
+        Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
+            String.valueOf(System.currentTimeMillis())));
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java
new file mode 100644
index 0000000000..8989892f77
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
+import org.apache.pinot.minion.executor.PinotTaskExecutor;
+import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
+import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
+
+
+@TaskExecutorFactory
+public class UpsertCompactionTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager) {
+  }
+
+  @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+  }
+
+  @Override
+  public PinotTaskExecutor create() {
+    return new UpsertCompactionTaskExecutor();
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
new file mode 100644
index 0000000000..aa84ffefa5
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
+  private static final String DEFAULT_BUFFER_PERIOD = "7d";
+  private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
+  private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0;
+
+  public static class SegmentSelectionResult {
+
+    private List<SegmentZKMetadata> _segmentsForCompaction;
+
+    private List<String> _segmentsForDeletion;
+
+    SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, List<String> segmentsForDeletion) {
+      _segmentsForCompaction = segmentsForCompaction;
+      _segmentsForDeletion = segmentsForDeletion;
+    }
+
+    public List<SegmentZKMetadata> getSegmentsForCompaction() {
+      return _segmentsForCompaction;
+    }
+
+    public List<String> getSegmentsForDeletion() {
+      return _segmentsForDeletion;
+    }
+  }
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+    for (TableConfig tableConfig : tableConfigs) {
+      if (!validate(tableConfig)) {
+        continue;
+      }
+
+      String tableNameWithType = tableConfig.getTableName();
+      LOGGER.info("Start generating task configs for table: {}", tableNameWithType);
+
+      Map<String, String> taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+      List<SegmentZKMetadata> completedSegments = getCompletedSegments(tableNameWithType, taskConfigs);
+
+      if (completedSegments.isEmpty()) {
+        LOGGER.info("No completed segments were eligible for compaction for table: {}", tableNameWithType);
+        continue;
+      }
+
+      // get server to segment mappings
+      PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager();
+      Map<String, List<String>> serverToSegments = pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      BiMap<String, String> serverToEndpoints;
+      try {
+        serverToEndpoints = pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+      } catch (InvalidConfigException e) {
+        throw new RuntimeException(e);
+      }
+
+      Map<String, SegmentZKMetadata> completedSegmentsMap =
+          completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity()));
+
+      List<String> validDocIdUrls;
+      try {
+        validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints, tableNameWithType,
+            completedSegmentsMap.keySet());
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+
+      // request the urls from the servers
+      CompletionServiceHelper completionServiceHelper =
+          new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(), _clusterInfoAccessor.getConnectionManager(),
+              serverToEndpoints.inverse());
+
+      CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+          completionServiceHelper.doMultiGetRequest(validDocIdUrls, tableNameWithType, false, 3000);
+
+      SegmentSelectionResult segmentSelectionResult =
+          processValidDocIdMetadata(taskConfigs, completedSegmentsMap, serviceResponse._httpResponses.entrySet());
+
+      if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
+        pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(),
+            "0d");
+        LOGGER.info("Deleted segments containing only invalid records for table: {}", tableNameWithType);
+      }
+
+      int numTasks = 0;
+      int maxTasks = getMaxTasks(taskType, tableNameWithType, taskConfigs);
+      for (SegmentZKMetadata segment : segmentSelectionResult.getSegmentsForCompaction()) {
+        if (numTasks == maxTasks) {
+          break;
+        }
+        Map<String, String> configs = new HashMap<>();
+        configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
+        configs.put(MinionConstants.SEGMENT_NAME_KEY, segment.getSegmentName());
+        configs.put(MinionConstants.DOWNLOAD_URL_KEY, segment.getDownloadUrl());
+        configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
+        configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc()));
+        pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs));
+        numTasks++;
+      }
+      LOGGER.info("Finished generating {} tasks configs for table: {}", numTasks, tableNameWithType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  @VisibleForTesting
+  public static SegmentSelectionResult processValidDocIdMetadata(Map<String, String> taskConfigs,
+      Map<String, SegmentZKMetadata> completedSegmentsMap, Set<Map.Entry<String, String>> responseSet) {
+    double invalidRecordsThresholdPercent = Double.parseDouble(
+        taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
+            String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
+    long invalidRecordsThresholdCount = Long.parseLong(
+        taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
+            String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
+    List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
+    List<String> segmentsForDeletion = new ArrayList<>();
+    for (Map.Entry<String, String> streamResponse : responseSet) {
+      JsonNode allValidDocIdMetadata;
+      try {
+        allValidDocIdMetadata = JsonUtils.stringToJsonNode(streamResponse.getValue());
+      } catch (IOException e) {
+        LOGGER.error("Unable to parse validDocIdMetadata response for: {}", streamResponse.getKey());
+        continue;
+      }
+      Iterator<JsonNode> iterator = allValidDocIdMetadata.elements();
+      while (iterator.hasNext()) {
+        JsonNode validDocIdMetadata = iterator.next();
+        long totalInvalidDocs = validDocIdMetadata.get("totalInvalidDocs").asLong();
+        String segmentName = validDocIdMetadata.get("segmentName").asText();
+        SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+        long totalDocs = validDocIdMetadata.get("totalDocs").asLong();
+        double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
+        if (totalInvalidDocs == totalDocs) {
+          segmentsForDeletion.add(segment.getSegmentName());
+        } else if (invalidRecordPercent > invalidRecordsThresholdPercent
+            && totalInvalidDocs > invalidRecordsThresholdCount) {
+          segmentsForCompaction.add(segment);
+        }
+      }
+    }
+    return new SegmentSelectionResult(segmentsForCompaction, segmentsForDeletion);
+  }
+
+  @VisibleForTesting
+  public static List<String> getValidDocIdMetadataUrls(Map<String, List<String>> serverToSegments,
+      BiMap<String, String> serverToEndpoints, String tableNameWithType, Set<String> completedSegments)
+      throws URISyntaxException {
+    Set<String> remainingSegments = new HashSet<>(completedSegments);
+    List<String> urls = new ArrayList<>();
+    for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
+      if (remainingSegments.isEmpty()) {
+        break;
+      }
+      String server = entry.getKey();
+      List<String> segmentNames = entry.getValue();
+      URIBuilder uriBuilder = new URIBuilder(serverToEndpoints.get(server)).setPath(
+          String.format("/tables/%s/validDocIdMetadata", tableNameWithType));
+      int completedSegmentCountPerServer = 0;
+      for (String segmentName : segmentNames) {
+        if (remainingSegments.remove(segmentName)) {
+          completedSegmentCountPerServer++;
+          uriBuilder.addParameter("segmentNames", segmentName);
+        }
+      }
+      if (completedSegmentCountPerServer > 0) {
+        // only add to the list if the server has completed segments
+        urls.add(uriBuilder.toString());
+      }
+    }
+    return urls;
+  }
+
+  private List<SegmentZKMetadata> getCompletedSegments(String tableNameWithType, Map<String, String> taskConfigs) {
+    List<SegmentZKMetadata> completedSegments = new ArrayList<>();
+    String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
+    long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+    List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+    for (SegmentZKMetadata segment : allSegments) {
+      CommonConstants.Segment.Realtime.Status status = segment.getStatus();
+      // initial segments selection based on status and age
+      if (status.isCompleted() && (segment.getEndTimeMs() <= (System.currentTimeMillis() - bufferMs))) {
+        completedSegments.add(segment);
+      }
+    }
+    return completedSegments;
+  }
+
+  @VisibleForTesting
+  public static int getMaxTasks(String taskType, String tableNameWithType, Map<String, String> taskConfigs) {
+    int maxTasks = Integer.MAX_VALUE;
+    String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+    if (tableMaxNumTasksConfig != null) {
+      try {
+        maxTasks = Integer.parseInt(tableMaxNumTasksConfig);
+      } catch (Exception e) {
+        LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
+      }
+    }
+    return maxTasks;
+  }
+
+  @VisibleForTesting
+  static boolean validate(TableConfig tableConfig) {
+    String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
+    String tableNameWithType = tableConfig.getTableName();
+    if (tableConfig.getTableType() == TableType.OFFLINE) {
+      LOGGER.warn("Skip generation task: {} for table: {}, offline table is not supported", taskType,
+          tableNameWithType);
+      return false;
+    }
+    if (!tableConfig.isUpsertEnabled()) {
+      LOGGER.warn("Skip generation task: {} for table: {}, table without upsert enabled is not supported", taskType,
+          tableNameWithType);
+      return false;
+    }
+    return true;
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java
new file mode 100644
index 0000000000..591a21b8ff
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory;
+import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
+
+
+@EventObserverFactory
+public class UpsertCompactionTaskProgressObserverFactory extends BaseMinionProgressObserverFactory {
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.UpsertCompactionTask.TASK_TYPE;
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
new file mode 100644
index 0000000000..604c58f6d0
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class UpsertCompactionTaskExecutorTest {
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final String CLUSTER_NAME = "testCluster";
+
+  @Test
+  public void testGetServer() {
+    ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
+    Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields();
+    Map<String, String> map = new HashMap<>();
+    map.put("server1", SegmentStateModel.ONLINE);
+    externalViewSegmentAssignment.put(SEGMENT_NAME, map);
+    HelixAdmin clusterManagementTool = Mockito.mock(HelixAdmin.class);
+    MinionContext minionContext = MinionContext.getInstance();
+    Mockito.when(clusterManagementTool.getResourceExternalView(CLUSTER_NAME, REALTIME_TABLE_NAME))
+        .thenReturn(externalView);
+    HelixManager helixManager = Mockito.mock(HelixManager.class);
+    Mockito.when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
+    Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(clusterManagementTool);
+    minionContext.setHelixManager(helixManager);
+
+    String server = UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME);
+
+    Assert.assertEquals(server, "server1");
+
+    // verify exception thrown with OFFLINE server
+    map.put("server1", SegmentStateModel.OFFLINE);
+    Assert.assertThrows(IllegalStateException.class,
+        () -> UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME));
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
new file mode 100644
index 0000000000..9c93979d9f
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import java.net.URISyntaxException;
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class UpsertCompactionTaskGeneratorTest {
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private UpsertCompactionTaskGenerator _taskGenerator;
+  private TableConfig _tableConfig;
+  private ClusterInfoAccessor _mockClusterInfoAccessor;
+  private SegmentZKMetadata _completedSegment;
+  private SegmentZKMetadata _completedSegment2;
+  private Map<String, SegmentZKMetadata> _completedSegmentsMap;
+
+  @BeforeClass
+  public void setUp() {
+    _taskGenerator = new UpsertCompactionTaskGenerator();
+    Map<String, Map<String, String>> tableTaskConfigs = new HashMap<>();
+    Map<String, String> compactionConfigs = new HashMap<>();
+    tableTaskConfigs.put(UpsertCompactionTask.TASK_TYPE, compactionConfigs);
+    _tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+            .setTaskConfig(new TableTaskConfig(tableTaskConfigs)).build();
+    _mockClusterInfoAccessor = mock(ClusterInfoAccessor.class);
+
+    _completedSegment = new SegmentZKMetadata("testTable__0");
+    _completedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    _completedSegment.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("2d"));
+    _completedSegment.setEndTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("1d"));
+    _completedSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+    _completedSegment.setTotalDocs(100L);
+
+    _completedSegment2 = new SegmentZKMetadata("testTable__1");
+    _completedSegment2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    _completedSegment2.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("1d"));
+    _completedSegment2.setEndTime(System.currentTimeMillis());
+    _completedSegment2.setTimeUnit(TimeUnit.MILLISECONDS);
+    _completedSegment2.setTotalDocs(10L);
+
+    _completedSegmentsMap = new HashMap<>();
+    _completedSegmentsMap.put(_completedSegment.getSegmentName(), _completedSegment);
+    _completedSegmentsMap.put(_completedSegment2.getSegmentName(), _completedSegment2);
+  }
+
+  @Test
+  public void testValidate() {
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .build();
+    assertFalse(UpsertCompactionTaskGenerator.validate(tableConfig));
+
+    TableConfigBuilder tableConfigBuilder =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME);
+    assertFalse(UpsertCompactionTaskGenerator.validate(tableConfigBuilder.build()));
+
+    tableConfigBuilder = tableConfigBuilder.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL));
+    assertTrue(UpsertCompactionTaskGenerator.validate(tableConfigBuilder.build()));
+  }
+
+  @Test
+  public void testGenerateTasksValidatesTableConfigs() {
+    UpsertCompactionTaskGenerator taskGenerator = new UpsertCompactionTaskGenerator();
+    TableConfig offlineTableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .build();
+    List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    TableConfig realtimeTableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .build();
+    pinotTaskConfigs = taskGenerator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+  }
+
+  @Test
+  public void testGenerateTasksWithNoSegments() {
+    when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(Collections.emptyList()));
+    _taskGenerator.init(_mockClusterInfoAccessor);
+
+    List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
+
+    assertEquals(pinotTaskConfigs.size(), 0);
+  }
+
+  @Test
+  public void testGenerateTasksWithConsumingSegment() {
+    SegmentZKMetadata consumingSegment = new SegmentZKMetadata("testTable__0");
+    consumingSegment.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+    when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(consumingSegment));
+    _taskGenerator.init(_mockClusterInfoAccessor);
+
+    List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
+
+    assertEquals(pinotTaskConfigs.size(), 0);
+  }
+
+  @Test
+  public void testGenerateTasksWithNewlyCompletedSegment() {
+    when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+        Lists.newArrayList(_completedSegment));
+    _taskGenerator.init(_mockClusterInfoAccessor);
+
+    List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig));
+
+    assertEquals(pinotTaskConfigs.size(), 0);
+  }
+
+  @Test
+  public void testGetValidDocIdMetadataUrls()
+      throws URISyntaxException {
+    Map<String, List<String>> serverToSegments = new HashMap<>();
+    serverToSegments.put("server1",
+        Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName()));
+    serverToSegments.put("server2", Lists.newArrayList("consumingSegment"));
+    BiMap<String, String> serverToEndpoints = HashBiMap.create(1);
+    serverToEndpoints.put("server1", "http://endpoint1");
+    serverToEndpoints.put("server2", "http://endpoint2");
+    Set<String> completedSegments = new HashSet<>();
+    completedSegments.add(_completedSegment.getSegmentName());
+    completedSegments.add(_completedSegment2.getSegmentName());
+
+    List<String> validDocIdUrls =
+        UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints,
+            REALTIME_TABLE_NAME, completedSegments);
+
+    String expectedUrl =
+        String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s", "http://endpoint1",
+            REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), _completedSegment2.getSegmentName());
+    assertEquals(validDocIdUrls.get(0), expectedUrl);
+    assertEquals(validDocIdUrls.size(), 1);
+  }
+
+  @Test
+  public void testGetValidDocIdMetadataUrlsWithReplicatedSegments()
+      throws URISyntaxException {
+    Map<String, List<String>> serverToSegments = new LinkedHashMap<>();
+    serverToSegments.put("server1",
+        Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName()));
+    serverToSegments.put("server2",
+        Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName()));
+    BiMap<String, String> serverToEndpoints = HashBiMap.create(1);
+    serverToEndpoints.put("server1", "http://endpoint1");
+    serverToEndpoints.put("server2", "http://endpoint2");
+    Set<String> completedSegments = new HashSet<>();
+    completedSegments.add(_completedSegment.getSegmentName());
+    completedSegments.add(_completedSegment2.getSegmentName());
+
+    List<String> validDocIdUrls =
+        UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints,
+            REALTIME_TABLE_NAME, completedSegments);
+
+    String expectedUrl =
+        String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s", "http://endpoint1",
+            REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), _completedSegment2.getSegmentName());
+    assertEquals(validDocIdUrls.get(0), expectedUrl);
+    assertEquals(validDocIdUrls.size(), 1);
+  }
+
+  @Test
+  public void testGetMaxTasks() {
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, "10");
+
+    int maxTasks =
+        UpsertCompactionTaskGenerator.getMaxTasks(UpsertCompactionTask.TASK_TYPE, REALTIME_TABLE_NAME, taskConfigs);
+
+    assertEquals(maxTasks, 10);
+  }
+
+  @Test
+  public void testProcessValidDocIdMetadata() {
+    Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
+    Set<Map.Entry<String, String>> responseSet = new HashSet<>();
+    String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \""
+        + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + "}," + "{" + "\"totalValidDocs\" : 0,"
+        + "\"totalInvalidDocs\" : 10," + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\","
+        + "\"totalDocs\" : 10" + "}]";
+    responseSet.add(new AbstractMap.SimpleEntry<>("", json));
+    UpsertCompactionTaskGenerator.SegmentSelectionResult segmentSelectionResult =
+        UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
+        _completedSegment.getSegmentName());
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName());
+
+    // test with a higher invalidRecordsThresholdPercent
+    compactionConfigs = getCompactionConfigs("60", "10");
+    segmentSelectionResult =
+        UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet);
+    assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
+
+    // test without an invalidRecordsThresholdPercent
+    compactionConfigs = getCompactionConfigs("0", "10");
+    segmentSelectionResult =
+        UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
+        _completedSegment.getSegmentName());
+
+    // test without a invalidRecordsThresholdCount
+    compactionConfigs = getCompactionConfigs("30", "0");
+    segmentSelectionResult =
+        UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
+        _completedSegment.getSegmentName());
+  }
+
+  private Map<String, String> getCompactionConfigs(String invalidRecordsThresholdPercent,
+      String invalidRecordsThresholdCount) {
+    Map<String, String> compactionConfigs = new HashMap<>();
+    if (!invalidRecordsThresholdPercent.equals("0")) {
+      compactionConfigs.put(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, invalidRecordsThresholdPercent);
+    }
+    if (!invalidRecordsThresholdCount.equals("0")) {
+      compactionConfigs.put(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT, invalidRecordsThresholdCount);
+    }
+    return compactionConfigs;
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index d5741fa0b7..a64170c60f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -100,6 +100,7 @@ public final class TableConfigUtils {
 
   // supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}.
   private static final String REALTIME_TO_OFFLINE_TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+  private static final String UPSERT_COMPACTION_TASK_TYPE = "UpsertCompactionTask";
 
   // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
   // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
@@ -543,6 +544,33 @@ public final class TableConfigUtils {
               }
             }
           }
+        } else if (taskTypeConfigName.equals(UPSERT_COMPACTION_TASK_TYPE)) {
+          // check table is realtime
+          Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
+              "UpsertCompactionTask only supports realtime tables!");
+          // check upsert enabled
+          Preconditions.checkState(tableConfig.isUpsertEnabled(), "Upsert must be enabled for UpsertCompactionTask");
+
+          // check no malformed period
+          if (taskTypeConfig.containsKey("bufferTimePeriod")) {
+            TimeUtils.convertPeriodToMillis(taskTypeConfig.get("bufferTimePeriod"));
+          }
+          // check maxNumRecordsPerSegment
+          if (taskTypeConfig.containsKey("invalidRecordsThresholdPercent")) {
+            Preconditions.checkState(Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) > 0
+                    && Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) <= 100,
+                "invalidRecordsThresholdPercent must be > 0 and <= 100");
+          }
+          // check invalidRecordsThresholdCount
+          if (taskTypeConfig.containsKey("invalidRecordsThresholdCount")) {
+            Preconditions.checkState(Long.parseLong(taskTypeConfig.get("invalidRecordsThresholdCount")) >= 1,
+                "invalidRecordsThresholdCount must be >= 1");
+          }
+          // check that either invalidRecordsThresholdPercent or invalidRecordsThresholdCount was provided
+          Preconditions.checkState(
+              taskTypeConfig.containsKey("invalidRecordsThresholdPercent") || taskTypeConfig.containsKey(
+                  "invalidRecordsThresholdCount"),
+              "invalidRecordsThresholdPercent or invalidRecordsThresholdCount or both must be provided");
         }
       }
     }
@@ -581,8 +609,8 @@ public final class TableConfigUtils {
     Preconditions.checkState(streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType(),
         "Upsert/Dedup table must use low-level streaming consumer type");
     // replica group is configured for routing
-    Preconditions.checkState(tableConfig.getRoutingConfig() != null
-            && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
+    Preconditions.checkState(
+        tableConfig.getRoutingConfig() != null && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()),
         "Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing");
 
     // specifically for upsert
@@ -649,8 +677,8 @@ public final class TableConfigUtils {
    */
   @VisibleForTesting
   static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) {
-    if (MapUtils.isEmpty(tableConfig.getInstancePartitionsMap())
-        || MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
+    if (MapUtils.isEmpty(tableConfig.getInstancePartitionsMap()) || MapUtils.isEmpty(
+        tableConfig.getInstanceAssignmentConfigMap())) {
       return;
     }
     for (InstancePartitionsType instancePartitionsType : tableConfig.getInstancePartitionsMap().keySet()) {
@@ -668,11 +696,11 @@ public final class TableConfigUtils {
    */
   @VisibleForTesting
   static void validatePartitionedReplicaGroupInstance(TableConfig tableConfig) {
-    if (tableConfig.getValidationConfig().getReplicaGroupStrategyConfig() == null
-        || MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
+    if (tableConfig.getValidationConfig().getReplicaGroupStrategyConfig() == null || MapUtils.isEmpty(
+        tableConfig.getInstanceAssignmentConfigMap())) {
       return;
     }
-    for (Map.Entry<String, InstanceAssignmentConfig> entry: tableConfig.getInstanceAssignmentConfigMap().entrySet()) {
+    for (Map.Entry<String, InstanceAssignmentConfig> entry : tableConfig.getInstanceAssignmentConfigMap().entrySet()) {
       boolean isNullReplicaGroupPartitionConfig = entry.getValue().getReplicaGroupPartitionConfig() == null;
       Preconditions.checkState(isNullReplicaGroupPartitionConfig,
           "Both replicaGroupStrategyConfig and replicaGroupPartitionConfig is provided");
@@ -1025,8 +1053,8 @@ public final class TableConfigUtils {
       return;
     }
 
-    boolean forwardIndexDisabled =
-        Boolean.parseBoolean(fieldConfigProperties.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED,
+    boolean forwardIndexDisabled = Boolean.parseBoolean(
+        fieldConfigProperties.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED,
             FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
     if (!forwardIndexDisabled) {
       return;
@@ -1045,20 +1073,23 @@ public final class TableConfigUtils {
     }
 
     Preconditions.checkState(
-        !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(),
-        String.format("Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)"
-            + " not supported with forward index for column: %s, disabled", columnName));
+        !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(), String.format(
+            "Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)"
+                + " not supported with forward index for column: %s, disabled", columnName));
 
-    boolean hasDictionary = fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY
-        || noDictionaryColumns == null || !noDictionaryColumns.contains(columnName);
-    boolean hasInvertedIndex = indexingConfigs.getInvertedIndexColumns() != null
-        && indexingConfigs.getInvertedIndexColumns().contains(columnName);
+    boolean hasDictionary =
+        fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY || noDictionaryColumns == null
+            || !noDictionaryColumns.contains(columnName);
+    boolean hasInvertedIndex =
+        indexingConfigs.getInvertedIndexColumns() != null && indexingConfigs.getInvertedIndexColumns()
+            .contains(columnName);
 
     if (!hasDictionary || !hasInvertedIndex) {
       LOGGER.warn("Forward index has been disabled for column {}. Either dictionary ({}) and / or inverted index ({}) "
               + "has been disabled. If the forward index needs to be regenerated or another index added please refresh "
               + "or back-fill the forward index as it cannot be rebuilt without dictionary and inverted index.",
-          columnName, hasDictionary ? "enabled" : "disabled", hasInvertedIndex ? "enabled" : "disabled");
+          columnName,
+          hasDictionary ? "enabled" : "disabled", hasInvertedIndex ? "enabled" : "disabled");
     }
   }
 
@@ -1271,9 +1302,7 @@ public final class TableConfigUtils {
     if (clone.getFieldConfigList() != null) {
       List<FieldConfig> cleanFieldConfigList = new ArrayList<>();
       for (FieldConfig fieldConfig : clone.getFieldConfigList()) {
-        cleanFieldConfigList.add(new FieldConfig.Builder(fieldConfig)
-            .withIndexTypes(null)
-            .build());
+        cleanFieldConfigList.add(new FieldConfig.Builder(fieldConfig).withIndexTypes(null).build());
       }
       clone.setFieldConfigList(cleanFieldConfigList);
     }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 1f6834f62c..9d58b42161 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -924,8 +924,8 @@ public class TableConfigUtilsTest {
       // be rebuilt without a dictionary, the constraint to have a dictionary has been lifted.
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null,
-          fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null, fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
@@ -938,8 +938,8 @@ public class TableConfigUtilsTest {
       // lifted.
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null,
-          fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null, fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
@@ -964,8 +964,9 @@ public class TableConfigUtilsTest {
       // Enable forward index disabled flag for a column with inverted index
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
-          FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, null, null,
+              null, fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
@@ -979,8 +980,9 @@ public class TableConfigUtilsTest {
       // Enable forward index disabled flag for a column with inverted index and is sorted
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
-          FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, null, null,
+              null, fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
@@ -994,9 +996,10 @@ public class TableConfigUtilsTest {
       // Enable forward index disabled flag for a multi-value column with inverted index and range index
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
-          FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE),
-          null, null, fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED,
+              Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, null,
+              fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for MV myCol2 with forward index disabled but has range and inverted index");
@@ -1011,9 +1014,10 @@ public class TableConfigUtilsTest {
       // Enable forward index disabled flag for a singe-value column with inverted index and range index v1
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY,
-          FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE),
-          null, null, fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED,
+              Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, null,
+              fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       tableConfig.getIndexingConfig().setRangeIndexVersion(1);
       TableConfigUtils.validate(tableConfig, schema);
@@ -1030,14 +1034,15 @@ public class TableConfigUtilsTest {
       // Enable forward index disabled flag for a column with inverted index and disable dictionary
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.RAW,
-          FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.INVERTED, null, null, null,
+              fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should not be able to disable dictionary but keep inverted index");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "Cannot create an Inverted index on column myCol2 specified in the "
-          + "noDictionaryColumns config");
+      Assert.assertEquals(e.getMessage(),
+          "Cannot create an Inverted index on column myCol2 specified in the " + "noDictionaryColumns config");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1046,8 +1051,9 @@ public class TableConfigUtilsTest {
       // Enable forward index disabled flag for a column with FST index and disable dictionary
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.RAW,
-          FieldConfig.IndexType.FST, null, null, null, fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null, null, null,
+              fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should not be able to disable dictionary but keep inverted index");
@@ -1061,8 +1067,9 @@ public class TableConfigUtilsTest {
       // Enable forward index disabled flag for a column with FST index and disable dictionary
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
-      FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.RAW,
-          FieldConfig.IndexType.RANGE, null, null, null, fieldConfigProperties);
+      FieldConfig fieldConfig =
+          new FieldConfig("intCol", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.RANGE, null, null, null,
+              fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
@@ -1415,8 +1422,7 @@ public class TableConfigUtilsTest {
   @Test
   public void testValidateUpsertConfig() {
     Schema schema =
-        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-            .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
             .build();
     UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     TableConfig tableConfig =
@@ -1526,12 +1532,10 @@ public class TableConfigUtilsTest {
     // Table upsert with delete column
     String incorrectTypeDelCol = "incorrectTypeDeleteCol";
     String delCol = "myDelCol";
-    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-        .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
         .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
         .addSingleValueDimension(incorrectTypeDelCol, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN)
-        .build();
+        .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
     streamConfigs = getStreamConfigs();
     streamConfigs.put("stream.kafka.consumer.type", "simple");
 
@@ -1783,16 +1787,14 @@ public class TableConfigUtilsTest {
     InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class);
 
     TableConfig tableConfigWithoutInstancePartitionsMap =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-            .build();
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
 
     // Call validate with a table-config without any instance partitions or instance assignment config
     TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithoutInstancePartitionsMap);
 
     TableConfig tableConfigWithInstancePartitionsMap =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-            .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
-            .build();
+            .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE")).build();
 
     // Call validate with a table-config with instance partitions set but not instance assignment config
     TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap);
@@ -1800,8 +1802,7 @@ public class TableConfigUtilsTest {
     TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
         .setInstanceAssignmentConfigMap(
-            ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig))
-            .build();
+            ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();
     try {
       // Call validate with instance partitions and config set for the same type
       TableConfigUtils.validateInstancePartitionsTypeMapConfig(invalidTableConfig);
@@ -1895,15 +1896,64 @@ public class TableConfigUtilsTest {
     }
   }
 
+  @Test
+  public void testUpsertCompactionTaskConfig() {
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+    Map<String, String> upsertCompactionTaskConfig =
+        ImmutableMap.of("bufferTimePeriod", "5d", "invalidRecordsThresholdPercent", "1", "invalidRecordsThresholdCount",
+            "1");
+    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+        .build();
+
+    TableConfigUtils.validateTaskConfigs(tableConfig, schema);
+
+    // test with invalid invalidRecordsThresholdPercents
+    upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "0");
+    TableConfig zeroPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> TableConfigUtils.validateTaskConfigs(zeroPercentTableConfig, schema));
+    upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "110");
+    TableConfig hundredTenPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> TableConfigUtils.validateTaskConfigs(hundredTenPercentTableConfig, schema));
+
+    // test with invalid invalidRecordsThresholdCount
+    upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdCount", "0");
+    TableConfig invalidCountTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> TableConfigUtils.validateTaskConfigs(invalidCountTableConfig, schema));
+
+    // test without invalidRecordsThresholdPercent or invalidRecordsThresholdCount
+    upsertCompactionTaskConfig = ImmutableMap.of("bufferTimePeriod", "5d");
+    TableConfig invalidTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig)))
+        .build();
+    Assert.assertThrows(IllegalStateException.class,
+        () -> TableConfigUtils.validateTaskConfigs(invalidTableConfig, schema));
+  }
+
   @Test
   public void testValidatePartitionedReplicaGroupInstance() {
     String partitionColumn = "testPartitionCol";
-    ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
-        new ReplicaGroupStrategyConfig(partitionColumn, 2);
+    ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new ReplicaGroupStrategyConfig(partitionColumn, 2);
 
     TableConfig tableConfigWithoutReplicaGroupStrategyConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-            .build();
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     // Call validate with a table-config without replicaGroupStrategyConfig or replicaGroupPartitionConfig.
     TableConfigUtils.validatePartitionedReplicaGroupInstance(tableConfigWithoutReplicaGroupStrategyConfig);
 
@@ -1918,12 +1968,12 @@ public class TableConfigUtilsTest {
     InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class);
     InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
         new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 2, 0, false, partitionColumn);
-    Mockito.doReturn(instanceReplicaGroupPartitionConfig)
-        .when(instanceAssignmentConfig).getReplicaGroupPartitionConfig();
+    Mockito.doReturn(instanceReplicaGroupPartitionConfig).when(instanceAssignmentConfig)
+        .getReplicaGroupPartitionConfig();
 
-    TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE)
-        .setTableName(TABLE_NAME).setInstanceAssignmentConfigMap(
-            ImmutableMap.of(TableType.OFFLINE.toString(), instanceAssignmentConfig)).build();
+    TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setInstanceAssignmentConfigMap(ImmutableMap.of(TableType.OFFLINE.toString(), instanceAssignmentConfig))
+        .build();
     invalidTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
 
     try {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 8148956ae6..c03328c2bb 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -71,6 +71,7 @@ import org.apache.pinot.common.restlet.resources.TablesList;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -164,8 +165,8 @@ public class TablesResource {
   @Encoded
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tables/{tableName}/metadata")
-  @ApiOperation(value = "List metadata for all segments of a given table",
-      notes = "List segments metadata of table hosted on this server")
+  @ApiOperation(value = "List metadata for all segments of a given table", notes = "List segments metadata of table "
+      + "hosted on this server")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Success"),
       @ApiResponse(code = 500, message = "Internal server error"),
@@ -174,7 +175,7 @@ public class TablesResource {
   public String getSegmentMetadata(
       @ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName,
       @ApiParam(value = "Column name", allowMultiple = true) @QueryParam("columns") @DefaultValue("")
-          List<String> columns)
+      List<String> columns)
       throws WebApplicationException {
     InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
 
@@ -299,7 +300,7 @@ public class TablesResource {
       @PathParam("tableName") String tableName,
       @ApiParam(value = "Segment name", required = true) @PathParam("segmentName") String segmentName,
       @ApiParam(value = "Column name", allowMultiple = true) @QueryParam("columns") @DefaultValue("")
-          List<String> columns) {
+      List<String> columns) {
     for (int i = 0; i < columns.size(); i++) {
       try {
         columns.set(i, URLDecoder.decode(columns.get(i), StandardCharsets.UTF_8.name()));
@@ -348,8 +349,8 @@ public class TablesResource {
     try {
       Map<String, String> segmentCrcForTable = new HashMap<>();
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-        segmentCrcForTable
-            .put(segmentDataManager.getSegmentName(), segmentDataManager.getSegment().getSegmentMetadata().getCrc());
+        segmentCrcForTable.put(segmentDataManager.getSegmentName(),
+            segmentDataManager.getSegment().getSegmentMetadata().getCrc());
       }
       return ResourceUtils.convertToJsonString(segmentCrcForTable);
     } catch (Exception e) {
@@ -429,6 +430,7 @@ public class TablesResource {
       @PathParam("tableNameWithType") String tableNameWithType,
       @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
       @Context HttpHeaders httpHeaders) {
+    segmentName = URIUtils.decode(segmentName);
     LOGGER.info("Received a request to download validDocIds for segment {} table {}", segmentName, tableNameWithType);
     // Validate data access
     ServerResourceUtils.validateDataAccess(_accessControlFactory, tableNameWithType, httpHeaders);
@@ -466,6 +468,60 @@ public class TablesResource {
     }
   }
 
+  @GET
+  @Path("/tables/{tableNameWithType}/validDocIdMetadata")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Provides segment validDocId metadata", notes = "Provides segment validDocId metadata")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class),
+      @ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class)
+  })
+  public String getValidDocIdMetadata(
+      @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Segment name", allowMultiple = true, required = true) @QueryParam("segmentNames")
+      List<String> segmentNames) {
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+    List<String> missingSegments = new ArrayList<>();
+    List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentNames, missingSegments);
+    if (!missingSegments.isEmpty()) {
+      throw new WebApplicationException(String.format("Table %s has missing segments", tableNameWithType),
+          Response.Status.NOT_FOUND);
+    }
+    List<Map<String, Object>> allValidDocIdMetadata = new ArrayList<>();
+    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+      try {
+        IndexSegment indexSegment = segmentDataManager.getSegment();
+        if (!(indexSegment instanceof ImmutableSegmentImpl)) {
+          throw new WebApplicationException(
+              String.format("Table %s segment %s is not a immutable segment", tableNameWithType,
+                  segmentDataManager.getSegmentName()), Response.Status.BAD_REQUEST);
+        }
+        MutableRoaringBitmap validDocIds =
+            indexSegment.getValidDocIds() != null ? indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+        if (validDocIds == null) {
+          throw new WebApplicationException(
+              String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType,
+                  segmentDataManager.getSegmentName()), Response.Status.NOT_FOUND);
+        }
+        Map<String, Object> validDocIdMetadata = new HashMap<>();
+        int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
+        int totalValidDocs = validDocIds.getCardinality();
+        int totalInvalidDocs = totalDocs - totalValidDocs;
+        validDocIdMetadata.put("segmentName", segmentDataManager.getSegmentName());
+        validDocIdMetadata.put("totalDocs", totalDocs);
+        validDocIdMetadata.put("totalValidDocs", totalValidDocs);
+        validDocIdMetadata.put("totalInvalidDocs", totalInvalidDocs);
+        allValidDocIdMetadata.add(validDocIdMetadata);
+      } finally {
+        tableDataManager.releaseSegment(segmentDataManager);
+      }
+    }
+    return ResourceUtils.convertToJsonString(allValidDocIdMetadata);
+  }
+
   /**
    * Upload a low level consumer segment to segment store and return the segment download url. This endpoint is used
    * when segment store copy is unavailable for committed low level consumer segments.
@@ -493,7 +549,7 @@ public class TablesResource {
   })
   public String uploadLLCSegment(
       @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName")
-          String realtimeTableName,
+      String realtimeTableName,
       @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName,
       @QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs)
       throws Exception {
@@ -561,13 +617,13 @@ public class TablesResource {
   @GET
   @Path("tables/{realtimeTableName}/consumingSegmentsInfo")
   @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Get the info for consumers of this REALTIME table",
-      notes = "Get consumers info from the table data manager. Note that the partitionToOffsetMap has been deprecated "
+  @ApiOperation(value = "Get the info for consumers of this REALTIME table", notes =
+      "Get consumers info from the table data manager. Note that the partitionToOffsetMap has been deprecated "
           + "and will be removed in the next release. The info is now embedded within each partition's state as "
           + "currentOffsetsMap")
   public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
       @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName")
-          String realtimeTableName) {
+      String realtimeTableName) {
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
     if (TableType.OFFLINE == tableType) {
       throw new WebApplicationException("Cannot get consuming segment info for OFFLINE table: " + realtimeTableName);
@@ -590,18 +646,14 @@ public class TablesResource {
             recordsLagMap.put(k, v.getRecordsLag());
             availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
           });
-          @Deprecated Map<String, String> partitiionToOffsetMap =
-              realtimeSegmentDataManager.getPartitionToCurrentOffset();
-          segmentConsumerInfoList.add(
-              new SegmentConsumerInfo(segmentDataManager.getSegmentName(),
-                  realtimeSegmentDataManager.getConsumerState().toString(),
-                  realtimeSegmentDataManager.getLastConsumedTimestamp(),
-                  partitiionToOffsetMap,
-                  new SegmentConsumerInfo.PartitionOffsetInfo(
-                      partitiionToOffsetMap,
-                      partitionStateMap.entrySet().stream().collect(
-                          Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())
-                      ), recordsLagMap, availabilityLagMsMap)));
+          @Deprecated
+          Map<String, String> partitiionToOffsetMap = realtimeSegmentDataManager.getPartitionToCurrentOffset();
+          segmentConsumerInfoList.add(new SegmentConsumerInfo(segmentDataManager.getSegmentName(),
+              realtimeSegmentDataManager.getConsumerState().toString(),
+              realtimeSegmentDataManager.getLastConsumedTimestamp(), partitiionToOffsetMap,
+              new SegmentConsumerInfo.PartitionOffsetInfo(partitiionToOffsetMap, partitionStateMap.entrySet().stream()
+                  .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())),
+                  recordsLagMap, availabilityLagMsMap)));
         }
       }
     } catch (Exception e) {
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index bb912dd9e0..7e7c4b7fba 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -113,29 +113,30 @@ public class TablesResourceTest extends BaseResourceTest {
   public void getTableMetadata()
       throws Exception {
     for (TableType tableType : TableType.values()) {
-      String tableMetadataPath = "/tables/" + TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME)
-          + "/metadata";
+      String tableMetadataPath =
+          "/tables/" + TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME) + "/metadata";
 
       JsonNode jsonResponse =
           JsonUtils.stringToJsonNode(_webTarget.path(tableMetadataPath).request().get(String.class));
       TableMetadataInfo metadataInfo = JsonUtils.jsonNodeToObject(jsonResponse, TableMetadataInfo.class);
       Assert.assertNotNull(metadataInfo);
-      Assert.assertEquals(metadataInfo.getTableName(), TableNameBuilder.forType(tableType)
-          .tableNameWithType(TABLE_NAME));
+      Assert.assertEquals(metadataInfo.getTableName(),
+          TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME));
       Assert.assertEquals(metadataInfo.getColumnLengthMap().size(), 0);
       Assert.assertEquals(metadataInfo.getColumnCardinalityMap().size(), 0);
       Assert.assertEquals(metadataInfo.getColumnIndexSizeMap().size(), 0);
 
-      jsonResponse = JsonUtils.stringToJsonNode(_webTarget.path(tableMetadataPath)
-          .queryParam("columns", "column1").queryParam("columns", "column2").request().get(String.class));
+      jsonResponse = JsonUtils.stringToJsonNode(
+          _webTarget.path(tableMetadataPath).queryParam("columns", "column1").queryParam("columns", "column2").request()
+              .get(String.class));
       metadataInfo = JsonUtils.jsonNodeToObject(jsonResponse, TableMetadataInfo.class);
       Assert.assertEquals(metadataInfo.getColumnLengthMap().size(), 2);
       Assert.assertEquals(metadataInfo.getColumnCardinalityMap().size(), 2);
       Assert.assertEquals(metadataInfo.getColumnIndexSizeMap().size(), 2);
-      Assert.assertTrue(metadataInfo.getColumnIndexSizeMap().get("column1")
-          .containsKey(StandardIndexes.dictionary().getId()));
-      Assert.assertTrue(metadataInfo.getColumnIndexSizeMap().get("column2")
-          .containsKey(StandardIndexes.forward().getId()));
+      Assert.assertTrue(
+          metadataInfo.getColumnIndexSizeMap().get("column1").containsKey(StandardIndexes.dictionary().getId()));
+      Assert.assertTrue(
+          metadataInfo.getColumnIndexSizeMap().get("column2").containsKey(StandardIndexes.forward().getId()));
     }
 
     // No such table
@@ -148,9 +149,8 @@ public class TablesResourceTest extends BaseResourceTest {
   public void testSegmentMetadata()
       throws Exception {
     IndexSegment defaultSegment = _realtimeIndexSegments.get(0);
-    String segmentMetadataPath =
-        "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/" + defaultSegment
-            .getSegmentName() + "/metadata";
+    String segmentMetadataPath = "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/"
+        + defaultSegment.getSegmentName() + "/metadata";
 
     JsonNode jsonResponse =
         JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class));
@@ -188,8 +188,8 @@ public class TablesResourceTest extends BaseResourceTest {
         .get(Response.class);
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
 
-    response = _webTarget
-        .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
+    response = _webTarget.path(
+            "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
         .request().get(Response.class);
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
   }
@@ -229,8 +229,8 @@ public class TablesResourceTest extends BaseResourceTest {
     Response response = _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname").request().get(Response.class);
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
 
-    response = _webTarget
-        .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
+    response = _webTarget.path(
+            "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
         .request().get(Response.class);
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
   }
@@ -243,8 +243,8 @@ public class TablesResourceTest extends BaseResourceTest {
         (ImmutableSegmentImpl) _realtimeIndexSegments.get(0));
 
     // Verify non-existent table and segment download return NOT_FOUND status.
-    Response response = _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname/validDocIds").request()
-        .get(Response.class);
+    Response response =
+        _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname/validDocIds").request().get(Response.class);
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
 
     response = _webTarget.path(
@@ -253,6 +253,26 @@ public class TablesResourceTest extends BaseResourceTest {
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
   }
 
+  @Test
+  public void testValidDocIdMetadata()
+      throws IOException {
+    IndexSegment segment = _realtimeIndexSegments.get(0);
+    // Verify the content of the downloaded snapshot from a realtime table.
+    downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+        (ImmutableSegmentImpl) segment);
+
+    String validDocIdMetadataPath =
+        "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/validDocIdMetadata";
+    String metadataResponse =
+        _webTarget.path(validDocIdMetadataPath).queryParam("segmentNames", segment.getSegmentName()).request()
+            .get(String.class);
+    JsonNode validDocIdMetadata = JsonUtils.stringToJsonNode(metadataResponse).get(0);
+
+    Assert.assertEquals(validDocIdMetadata.get("totalDocs").asInt(), 100000);
+    Assert.assertEquals(validDocIdMetadata.get("totalValidDocs").asInt(), 8);
+    Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 99992);
+  }
+
   // Verify metadata file from segments.
   private void downLoadAndVerifySegmentContent(String tableNameWithType, IndexSegment segment)
       throws IOException {
@@ -290,7 +310,7 @@ public class TablesResourceTest extends BaseResourceTest {
     PartitionUpsertMetadataManager upsertMetadataManager = mock(PartitionUpsertMetadataManager.class);
     ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
     int[] docIds = new int[]{1, 4, 6, 10, 15, 17, 18, 20};
-    for (int docId: docIds) {
+    for (int docId : docIds) {
       validDocIds.add(docId);
     }
     segment.enableUpsert(upsertMetadataManager, validDocIds, null);
@@ -315,21 +335,21 @@ public class TablesResourceTest extends BaseResourceTest {
         _realtimeIndexSegments);
 
     // Verify segment uploading succeed.
-    Response response = _webTarget.path(String
-        .format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+    Response response = _webTarget.path(
+        String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
             LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null);
     Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
     Assert.assertEquals(response.readEntity(String.class), SEGMENT_DOWNLOAD_URL);
 
     // Verify bad request: table type is offline
-    response = _webTarget.path(String
-        .format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
+    response = _webTarget.path(
+        String.format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME),
             _offlineIndexSegments.get(0).getSegmentName())).request().post(null);
     Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
 
     // Verify bad request: segment is not low level consumer segment
-    response = _webTarget.path(String
-        .format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+    response = _webTarget.path(
+        String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
             _realtimeIndexSegments.get(0).getSegmentName())).request().post(null);
     Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
 
@@ -349,9 +369,8 @@ public class TablesResourceTest extends BaseResourceTest {
   public void testOfflineTableSegmentMetadata()
       throws Exception {
     IndexSegment defaultSegment = _offlineIndexSegments.get(0);
-    String segmentMetadataPath =
-        "/tables/" + TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME) + "/segments/" + defaultSegment
-            .getSegmentName() + "/metadata";
+    String segmentMetadataPath = "/tables/" + TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME) + "/segments/"
+        + defaultSegment.getSegmentName() + "/metadata";
 
     JsonNode jsonResponse =
         JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class));
@@ -383,8 +402,8 @@ public class TablesResourceTest extends BaseResourceTest {
         .get(Response.class);
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
 
-    response = _webTarget
-        .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
+    response = _webTarget.path(
+            "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT")
         .request().get(Response.class);
     Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org