You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/08 22:09:06 UTC

[pinot] branch master updated: enable MergeRollupTask on realtime tables (#9890)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d3ea8dc162 enable MergeRollupTask on realtime tables (#9890)
d3ea8dc162 is described below

commit d3ea8dc1628a885f4faa8e8cf7b7c69c24782f79
Author: Haitao Zhang <ha...@startree.ai>
AuthorDate: Thu Dec 8 14:09:00 2022 -0800

    enable MergeRollupTask on realtime tables (#9890)
    
    * enable MergeRollupTask on realtime tables
    
    * disallow when upsert or dedup is enabled
    
    * address comments
    
    * add integration test
    
    * address comments
    
    * add lisence
    
    * add comments to explain the logic
    
    * address comments
    
    * fix r2o task
    
    * allow more waiting time on conditions
    
    * add output for test failure reason check
    
    * use longer condition check interval
    
    * add wait before condition check
    
    * make it work in github integration test
    
    * add a comment to the test case
    
    * update comments
---
 .../MergeRollupMinionClusterIntegrationTest.java   | 138 ++++++++++++++++++-
 .../BaseMultipleSegmentsConversionExecutor.java    |  13 +-
 .../mergerollup/MergeRollupTaskGenerator.java      | 148 +++++++++++++++------
 .../mergerollup/MergeRollupTaskGeneratorTest.java  |  95 ++++++++++---
 .../apache/pinot/spi/config/table/TableConfig.java |  10 ++
 5 files changed, 341 insertions(+), 63 deletions(-)

diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index 9602275068..87e6eb008d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -72,6 +72,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
   private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2";
   private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3";
   private static final String SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE = "myTable4";
+  private static final String SINGLE_LEVEL_CONCAT_TEST_REALTIME_TABLE = "myTable5";
   private static final long TIMEOUT_IN_MS = 10_000L;
 
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
@@ -98,6 +99,8 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     startController();
     startBrokers(1);
     startServers(1);
+    // Start Kafka
+    startKafka();
 
     // Create and upload the schema and table config
     Schema schema = createSchema();
@@ -133,6 +136,13 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3);
     uploadSegments(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE, _tarDir4);
 
+    // create the realtime table
+    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+    addTableConfig(tableConfig);
+    // Push data into Kafka
+    pushAvroIntoKafka(avroFiles);
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
 
     // Set up the H2 connection
     setUpH2Connection(avroFiles);
@@ -146,6 +156,26 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
   }
 
+  // this override is used by createRealtimeTableConfig
+  @Override
+  protected String getTableName() {
+    return SINGLE_LEVEL_CONCAT_TEST_REALTIME_TABLE;
+  }
+
+  // this override is used by createRealtimeTableConfig
+  @Override
+  protected TableTaskConfig getTaskConfig() {
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    tableTaskConfigs.put("100days.mergeType", "concat");
+    tableTaskConfigs.put("100days.bufferTimePeriod", "1d");
+    tableTaskConfigs.put("100days.bucketTimePeriod", "100d");
+    tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000");
+    tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000");
+    tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
+    tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
+    return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
+  }
+
   private TableConfig createOfflineTableConfig(String tableName, TableTaskConfig taskConfig) {
     return createOfflineTableConfig(tableName, taskConfig, null);
   }
@@ -265,7 +295,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
    * Test single level concat task with maxNumRecordPerTask, maxNumRecordPerSegment constraints
    */
   @Test
-  public void testSingleLevelConcat()
+  public void testOfflineTableSingleLevelConcat()
       throws Exception {
     // The original segments are time partitioned by month:
     // segmentName (totalDocs)
@@ -378,7 +408,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
    * Push type is set to Metadata
    */
   @Test
-  public void testSingleLevelConcatWithMetadataPush()
+  public void testOfflineTableSingleLevelConcatWithMetadataPush()
       throws Exception {
     // The original segments are time partitioned by month:
     // segmentName (totalDocs)
@@ -490,7 +520,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
    * Test single level rollup task with duplicate data (original segments * 2)
    */
   @Test
-  public void testSingleLevelRollup()
+  public void testOfflineTableSingleLevelRollup()
       throws Exception {
     // The original segments are time partitioned by month:
     // segmentName (totalDocs)
@@ -602,7 +632,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
    * Test multi level concat task
    */
   @Test
-  public void testMultiLevelConcat()
+  public void testOfflineTableMultiLevelConcat()
       throws Exception {
     // The original segments are time partitioned by month:
     // segmentName (totalDocs)
@@ -769,6 +799,105 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     }, 600_000L, "Failed to complete task");
   }
 
+  // The use case is similar as the one defined in offline table
+  @Test
+  public void testRealtimeTableSingleLevelConcat()
+      throws Exception {
+    // The original segments:
+    // mytable__0__0__{ts00} ... mytable__0__23__{ts023}
+    // mytable__1__0__{ts10} ... mytable__1__22__{ts122}
+    //
+    // Expected result segments:
+    // merged_100days_{ts1}_0_mytable_16071_16099_0
+    // merged_100days_{ts2}_0_mytable_16100_16154_0
+    // merged_100days_{ts2}_0_mytable_16101_16146_1
+    // merged_100days_{ts2}_1_mytable_16147_16199_0
+    // merged_100days_{ts2}_2_mytable_16196_16199_0
+    // merged_100days_{ts3}_0_mytable_16200_16252_1
+    // merged_100days_{ts3}_0_mytable_16200_16252_0
+    // merged_100days_{ts3}_1_mytable_16245_16295_0
+    // merged_100days_{ts3}_2_mytable_16290_16299_0
+    // merged_100days_{ts4}_0_mytable_16300_16359_0
+    // merged_100days_{ts4}_0_mytable_16323_16345_1
+    // merged_100days_{ts4}_1_mytable_16358_16399_0
+    // merged_100days_{ts5}_0_mytable_16400_16435_0
+    // mytable__0__23__{ts023} (in progress)
+    // mytable__1__22__{ts122} (in progress)
+    PinotHelixTaskResourceManager helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+    PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+    PinotHelixResourceManager pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+    String tableName = getTableName();
+
+    String sqlQuery = "SELECT count(*) FROM " + tableName; // 115545 rows for the test table
+    JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+    int[] expectedNumSubTasks = {1, 3, 3, 2, 1};
+    int[] expectedNumSegmentsQueried = {44, 37, 26, 18, 15};
+    long expectedWatermark = 16000 * 86_400_000L;
+    String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    int numTasks = 0;
+    for (String tasks = taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
+        tasks != null; tasks =
+        taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
+      assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
+      assertTrue(helixTaskResourceManager.getTaskQueues()
+          .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
+
+      // Will not schedule task if there's incomplete task
+      assertNull(
+          taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      waitForTaskToComplete();
+
+      // Check watermark
+      MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord(
+          taskManager.getClusterInfoAccessor()
+              .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, realtimeTableName));
+      assertNotNull(minionTaskMetadata);
+      assertEquals((long) minionTaskMetadata.getWatermarkMap().get("100days"), expectedWatermark);
+      expectedWatermark += 100 * 86_400_000L;
+
+      // Check metadata of merged segments
+      for (SegmentZKMetadata metadata : pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName)) {
+        if (metadata.getSegmentName().startsWith("merged")) {
+          // Check merged segment zk metadata
+          assertNotNull(metadata.getCustomMap());
+          assertEquals("100days",
+              metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY));
+          // Check merged segments are time partitioned
+          assertEquals(metadata.getEndTimeMs() / (86_400_000L * 100), metadata.getStartTimeMs() / (86_400_000L * 100));
+        }
+      }
+
+      final int finalNumTasks = numTasks;
+      TestUtils.waitForCondition(aVoid -> {
+        try {
+          // Check num total doc of merged segments are the same as the original segments
+          JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+          if (!SqlResultComparator.areEqual(actualJson, expectedJson, sqlQuery)) {
+            return false;
+          }
+          // Check query routing
+          int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt();
+          return numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks]
+              // when running on github tests, the consumer sometimes queries one more segment
+              || numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks] + 1;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }, TIMEOUT_IN_MS, "Timeout while validating segments");
+    }
+    // Check total tasks
+    assertEquals(numTasks, 5);
+
+    assertTrue(_controllerStarter.getControllerMetrics()
+        .containsGauge("mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days"));
+
+    // Drop the table
+    dropRealtimeTable(tableName);
+
+    // Check if the task metadata is cleaned up on table deletion
+    verifyTableDelete(realtimeTableName);
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
@@ -776,6 +905,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     stopServer();
     stopBroker();
     stopController();
+    stopKafka();
     stopZk();
     FileUtils.deleteDirectory(_tempDir);
   }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index c022c82ca2..c5b5f2b27a 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -49,6 +49,7 @@ import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -275,7 +276,17 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true");
         NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
             TableNameBuilder.extractRawTableName(tableNameWithType));
-        List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);
+        NameValuePair tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+            TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString());
+        // RealtimeToOfflineSegmentsTask pushed segments to the corresponding offline table
+        // TODO: This is not clean to put the override here, but let's think about it harder to see what is the proper
+        //  way to override it.
+        if (MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE.equals(taskType)) {
+          tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+              TableType.OFFLINE.toString());
+        }
+        List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter,
+            tableTypeParameter);
 
         pushSegment(tableNameParameter.getValue(), configs, outputSegmentTarURI, httpHeaders, parameters,
             segmentConversionResult);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 6043320495..9da496348f 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.minion.tasks.mergerollup;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.commons.lang3.StringUtils;
@@ -63,13 +65,26 @@ import org.slf4j.LoggerFactory;
 /**
  * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link MergeRollupTask}
  *
- * TODO: Add the support for realtime table
+ * Assumptions:
+ *  - When the MergeRollupTask starts the first time, records older than the min(now ms, max end time ms of all ready to
+ *    process segments) - bufferTimeMs have already been ingested. If not, newly ingested records older than that time
+ *    may not be properly merged (Due to the latest watermarks advanced too far before records are ingested).
+ *  - If it is needed, there are backfill protocols to ingest and replace records older than the latest watermarks.
+ *    Those protocols can handle time alignment (according to merge levels configurations) correctly.
+ *  - If it is needed, there are reconcile protocols to merge & rollup newly ingested segments that are (1) older than
+ *    the latest watermarks, and (2) not time aligned according to merge levels configurations
+ *    - For realtime tables, those protocols are needed if streaming records arrive late (older thant the latest
+ *      watermarks)
+ *    - For offline tables, those protocols are needed if there are non-time-aligned segments ingested accidentally.
  *
- * Steps:
  *
+ * Steps:
  *  - Pre-select segments:
  *    - Fetch all segments, select segments based on segment lineage (removing segmentsFrom for COMPLETED lineage
  *      entry and segmentsTo for IN_PROGRESS lineage entry)
+ *    - For realtime tables, remove
+ *      - in-progress segments (Segment.Realtime.Status.IN_PROGRESS), and
+ *      - sealed segments with start time later than the earliest start time of all in progress segments
  *    - Remove empty segments
  *    - Sort segments based on startTime and endTime in ascending order
  *
@@ -133,22 +148,25 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
       if (!validate(tableConfig, taskType)) {
         continue;
       }
-      String offlineTableName = tableConfig.getTableName();
-      LOGGER.info("Start generating task configs for table: {} for task: {}", offlineTableName, taskType);
+      String tableNameWithType = tableConfig.getTableName();
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableNameWithType, taskType);
 
       // Get all segment metadata
-      List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName);
+      List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+      // Filter segments based on status
+      List<SegmentZKMetadata> preSelectedSegmentsBasedOnStatus
+          = filterSegmentsBasedOnStatus(tableConfig.getTableType(), allSegments);
 
       // Select current segment snapshot based on lineage, filter out empty segments
-      SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(offlineTableName);
+      SegmentLineage segmentLineage = _clusterInfoAccessor.getSegmentLineage(tableNameWithType);
       Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
-      for (SegmentZKMetadata segment : allSegments) {
+      for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) {
         preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
       }
       SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage);
 
       List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
-      for (SegmentZKMetadata segment : allSegments) {
+      for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) {
         if (preSelectedSegmentsBasedOnLineage.contains(segment.getSegmentName()) && segment.getTotalDocs() > 0
             && MergeTaskUtils.allowMerge(segment)) {
           preSelectedSegments.add(segment);
@@ -158,8 +176,8 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
       if (preSelectedSegments.isEmpty()) {
         // Reset the watermark time if no segment found. This covers the case where the table is newly created or
         // all segments for the existing table got deleted.
-        resetDelayMetrics(offlineTableName);
-        LOGGER.info("Skip generating task: {} for table: {}, no segment is found.", taskType, offlineTableName);
+        resetDelayMetrics(tableNameWithType);
+        LOGGER.info("Skip generating task: {} for table: {}, no segment is found.", taskType, tableNameWithType);
         continue;
       }
 
@@ -186,7 +204,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
 
       // Get incomplete merge levels
       Set<String> inCompleteMergeLevels = new HashSet<>();
-      for (Map.Entry<String, TaskState> entry : TaskGeneratorUtils.getIncompleteTasks(taskType, offlineTableName,
+      for (Map.Entry<String, TaskState> entry : TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType,
           _clusterInfoAccessor).entrySet()) {
         for (PinotTaskConfig taskConfig : _clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
           inCompleteMergeLevels.add(taskConfig.getConfigs().get(MergeRollupTask.MERGE_LEVEL_KEY));
@@ -194,11 +212,11 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
       }
 
       ZNRecord mergeRollupTaskZNRecord = _clusterInfoAccessor
-          .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName);
+          .getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType);
       int expectedVersion = mergeRollupTaskZNRecord != null ? mergeRollupTaskZNRecord.getVersion() : -1;
       MergeRollupTaskMetadata mergeRollupTaskMetadata =
           mergeRollupTaskZNRecord != null ? MergeRollupTaskMetadata.fromZNRecord(mergeRollupTaskZNRecord)
-              : new MergeRollupTaskMetadata(offlineTableName, new TreeMap<>());
+              : new MergeRollupTaskMetadata(tableNameWithType, new TreeMap<>());
       List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
 
       // Schedule tasks from lowest to highest merge level (e.g. Hourly -> Daily -> Monthly -> Yearly)
@@ -211,7 +229,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
         // Skip scheduling if there's incomplete task for current mergeLevel
         if (inCompleteMergeLevels.contains(mergeLevel)) {
           LOGGER.info("Found incomplete task of merge level: {} for the same table: {}, Skipping task generation: {}",
-              mergeLevel, offlineTableName, taskType);
+              mergeLevel, tableNameWithType, taskType);
           continue;
         }
 
@@ -220,14 +238,14 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
         long bucketMs = TimeUtils.convertPeriodToMillis(bucketPeriod);
         if (bucketMs <= 0) {
           LOGGER.error("Bucket time period: {} (table : {}, mergeLevel : {}) must be larger than 0", bucketPeriod,
-              offlineTableName, mergeLevel);
+              tableNameWithType, mergeLevel);
           continue;
         }
         String bufferPeriod = mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY);
         long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
         if (bufferMs < 0) {
           LOGGER.error("Buffer time period: {} (table : {}, mergeLevel : {}) must be larger or equal to 0",
-              bufferPeriod, offlineTableName, mergeLevel);
+              bufferPeriod, tableNameWithType, mergeLevel);
           continue;
         }
         String maxNumParallelBucketsStr = mergeConfigs.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS);
@@ -235,7 +253,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
             : DEFAULT_NUM_PARALLEL_BUCKETS;
         if (maxNumParallelBuckets <= 0) {
           LOGGER.error("Maximum number of parallel buckets: {} (table : {}, mergeLevel : {}) must be larger than 0",
-              maxNumParallelBuckets, offlineTableName, mergeLevel);
+              maxNumParallelBuckets, tableNameWithType, mergeLevel);
           continue;
         }
 
@@ -255,14 +273,14 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
             lowestLevelMaxValidBucketEndTimeMs =
                 Math.max(lowestLevelMaxValidBucketEndTimeMs, currentValidBucketEndTimeMs);
           }
-          _tableLowestLevelMaxValidBucketEndTimeMs.put(offlineTableName, lowestLevelMaxValidBucketEndTimeMs);
+          _tableLowestLevelMaxValidBucketEndTimeMs.put(tableNameWithType, lowestLevelMaxValidBucketEndTimeMs);
         }
         // Create delay metrics even if there's no task scheduled, this helps the case that the controller is restarted
         // but the metrics are not available until the controller schedules a valid task
-        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, bufferMs, bucketMs);
+        createOrUpdateDelayMetrics(tableNameWithType, mergeLevel, null, watermarkMs, bufferMs, bucketMs);
         if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) {
           LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}) cannot be merged yet",
-              bucketStartMs, bucketEndMs, offlineTableName, mergeLevel);
+              bucketStartMs, bucketEndMs, tableNameWithType, mergeLevel);
           continue;
         }
 
@@ -336,18 +354,18 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
         }
 
         if (selectedSegmentsForAllBuckets.isEmpty()) {
-          LOGGER.info("No unmerged segment found for table: {}, mergeLevel: {}", offlineTableName, mergeLevel);
+          LOGGER.info("No unmerged segment found for table: {}, mergeLevel: {}", tableNameWithType, mergeLevel);
           continue;
         }
 
         // Bump up watermark to the earliest start time of selected segments truncated to the closest bucket boundary
         long newWatermarkMs = selectedSegmentsForAllBuckets.get(0).get(0).getStartTimeMs() / bucketMs * bucketMs;
         mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, newWatermarkMs);
-        LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} to: {}", offlineTableName, mergeLevel,
+        LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} to: {}", tableNameWithType, mergeLevel,
             watermarkMs, newWatermarkMs);
 
         // Update the delay metrics
-        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, bufferMs, bucketMs);
+        createOrUpdateDelayMetrics(tableNameWithType, mergeLevel, lowerMergeLevel, newWatermarkMs, bufferMs, bucketMs);
 
         // Create task configs
         int maxNumRecordsPerTask =
@@ -357,7 +375,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
         if (segmentPartitionConfig == null) {
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
             pinotTaskConfigsForTable.addAll(
-                createPinotTaskConfigs(selectedSegmentsPerBucket, offlineTableName, maxNumRecordsPerTask, mergeLevel,
+                createPinotTaskConfigs(selectedSegmentsPerBucket, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
                     null, mergeConfigs, taskConfigs));
           }
         } else {
@@ -396,13 +414,13 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
               List<Integer> partition = entry.getKey();
               List<SegmentZKMetadata> partitionedSegments = entry.getValue();
               pinotTaskConfigsForTable.addAll(
-                  createPinotTaskConfigs(partitionedSegments, offlineTableName, maxNumRecordsPerTask, mergeLevel,
+                  createPinotTaskConfigs(partitionedSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
                       partition, mergeConfigs, taskConfigs));
             }
 
             if (!outlierSegments.isEmpty()) {
               pinotTaskConfigsForTable.addAll(
-                  createPinotTaskConfigs(outlierSegments, offlineTableName, maxNumRecordsPerTask, mergeLevel,
+                  createPinotTaskConfigs(outlierSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
                       null, mergeConfigs, taskConfigs));
             }
           }
@@ -416,11 +434,11 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
       } catch (ZkException e) {
         LOGGER.error(
             "Version changed while updating merge/rollup task metadata for table: {}, skip scheduling. There are "
-                + "multiple task schedulers for the same table, need to investigate!", offlineTableName);
+                + "multiple task schedulers for the same table, need to investigate!", tableNameWithType);
         continue;
       }
       pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
-      LOGGER.info("Finished generating task configs for table: {} for task: {}, numTasks: {}", offlineTableName,
+      LOGGER.info("Finished generating task configs for table: {} for task: {}, numTasks: {}", tableNameWithType,
           taskType, pinotTaskConfigsForTable.size());
     }
 
@@ -430,22 +448,70 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
     return pinotTaskConfigs;
   }
 
+  @VisibleForTesting
+  static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, List<SegmentZKMetadata> allSegments) {
+    if (tableType == TableType.REALTIME) {
+      // For realtime table, don't process
+      // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS)
+      // 2. sealed segments with start time later than the earliest start time of all in progress segments
+      // This prevents those in-progress segments from not being merged.
+      //
+      // Note that we make the following two assumptions here:
+      // 1. streaming data consumer lags are negligible
+      // 2. streaming data records are ingested mostly in chronological order (no records are ingested with delay larger
+      //    than bufferTimeMS)
+      //
+      // We don't handle the following cases intentionally because it will be either overkill or too complex
+      // 1. New partition added. If new partitions are not picked up timely, the MergeRollupTask will move watermarks
+      //    forward, and may not be able to merge some lately-created segments for those new partitions -- users should
+      //    configure pinot properly to discover new partitions timely, or they should restart pinot servers manually
+      //    for new partitions to be picked up
+      // 2. (1) no new in-progress segments are created for some partitions (2) new in-progress segments are created for
+      //    partitions, but there is no record consumed (i.e, empty in-progress segments). In those two cases,
+      //    if new records are consumed later, the MergeRollupTask may have already moved watermarks forward, and may
+      //    not be able to merge those lately-created segments -- we assume that users will have a way to backfill those
+      //    records correctly.
+      long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+      for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+        if (!segmentZKMetadata.getStatus().isCompleted()
+            && segmentZKMetadata.getTotalDocs() > 0
+            && segmentZKMetadata.getStartTimeMs() < earliestStartTimeMsOfInProgressSegments) {
+          earliestStartTimeMsOfInProgressSegments = segmentZKMetadata.getStartTimeMs();
+        }
+      }
+      final long finalEarliestStartTimeMsOfInProgressSegments = earliestStartTimeMsOfInProgressSegments;
+      return allSegments.stream()
+              .filter(segmentZKMetadata -> segmentZKMetadata.getStatus().isCompleted()
+                  && segmentZKMetadata.getStartTimeMs() < finalEarliestStartTimeMsOfInProgressSegments)
+              .collect(Collectors.toList());
+    } else {
+      return allSegments;
+    }
+  }
+
   /**
    * Validate table config for merge/rollup task
    */
-  private boolean validate(TableConfig tableConfig, String taskType) {
-    String offlineTableName = tableConfig.getTableName();
-    if (tableConfig.getTableType() != TableType.OFFLINE) {
-      LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {}, REALTIME table is not supported yet", taskType,
-          offlineTableName);
-      return false;
-    }
-
+  @VisibleForTesting
+  static boolean validate(TableConfig tableConfig, String taskType) {
+    String tableNameWithType = tableConfig.getTableName();
     if (REFRESH.equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig))) {
       LOGGER.warn("Skip generating task: {} for non-APPEND table: {}, REFRESH table is not supported", taskType,
-          offlineTableName);
+          tableNameWithType);
       return false;
     }
+    if (tableConfig.getTableType() == TableType.REALTIME) {
+      if (tableConfig.isUpsertEnabled()) {
+        LOGGER.warn("Skip generating task: {} for table: {}, table with upsert enabled is not supported", taskType,
+            tableNameWithType);
+        return false;
+      }
+      if (tableConfig.isDedupEnabled()) {
+        LOGGER.warn("Skip generating task: {} for table: {}, table with dedup enabled is not supported", taskType,
+            tableNameWithType);
+        return false;
+      }
+    }
     return true;
   }
 
@@ -527,7 +593,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
    * Create pinot task configs with selected segments and configs
    */
   private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> selectedSegments,
-      String offlineTableName, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition,
+      String tableNameWithType, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition,
       Map<String, String> mergeConfigs, Map<String, String> taskConfigs) {
     int numRecordsPerTask = 0;
     List<List<String>> segmentNamesList = new ArrayList<>();
@@ -561,9 +627,9 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
 
     for (int i = 0; i < segmentNamesList.size(); i++) {
       String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR);
-      Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(offlineTableName, taskConfigs,
+      Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(tableNameWithType, taskConfigs,
           _clusterInfoAccessor);
-      configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
+      configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
       configs.put(MinionConstants.SEGMENT_NAME_KEY,
           StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR));
       configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL);
@@ -591,7 +657,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
       configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
           MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME
               + System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i
-              + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(offlineTableName));
+              + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(tableNameWithType));
       pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
     }
 
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 5830f76e86..c367bda735 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -40,12 +40,16 @@ import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
@@ -57,6 +61,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
@@ -67,15 +72,55 @@ import static org.testng.Assert.assertTrue;
 public class MergeRollupTaskGeneratorTest {
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
   private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
   private static final String DAILY = "daily";
   private static final String MONTHLY = "monthly";
 
-  private TableConfig getOfflineTableConfig(Map<String, Map<String, String>> taskConfigsMap) {
-    return new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+  private TableConfig getTableConfig(TableType tableType, Map<String, Map<String, String>> taskConfigsMap) {
+    return new TableConfigBuilder(tableType).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
         .setTaskConfig(new TableTaskConfig(taskConfigsMap)).build();
   }
 
+  @Test
+  public void testValidateIfMergeRollupCanBeEnabledOrNot() {
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+      .setTableName(RAW_TABLE_NAME)
+      .setTimeColumnName(TIME_COLUMN_NAME)
+      .build();
+    assertTrue(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(
+            new BatchIngestionConfig(Collections.emptyList(), "REFRESH", "daily"));
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN_NAME)
+        .setIngestionConfig(ingestionConfig)
+        .build();
+    assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+
+    tableConfig = new TableConfigBuilder(TableType.REALTIME)
+        .setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN_NAME)
+        .build();
+    assertTrue(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+
+    tableConfig = new TableConfigBuilder(TableType.REALTIME)
+        .setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
+        .build();
+    assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+
+    tableConfig = new TableConfigBuilder(TableType.REALTIME)
+        .setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN_NAME)
+        .setDedupConfig(new DedupConfig(true, HashFunction.MD5))
+        .build();
+    assertFalse(MergeRollupTaskGenerator.validate(tableConfig, MinionConstants.MergeRollupTask.TASK_TYPE));
+  }
+
   /**
    * Tests for some config checks
    */
@@ -84,21 +129,37 @@ public class MergeRollupTaskGeneratorTest {
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
 
     when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(new HashMap<>());
-    SegmentZKMetadata metadata1 = getSegmentZKMetadata("testTable__0", 5000, 50_000, TimeUnit.MILLISECONDS, null);
-    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(Lists.newArrayList(metadata1));
+    // the two following segments will be skipped when generating tasks
+    SegmentZKMetadata realtimeTableSegmentMetadata1 =
+        getSegmentZKMetadata("testTable__0__0__0", 5000, 50_000, TimeUnit.MILLISECONDS, null);
+    realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+    SegmentZKMetadata realtimeTableSegmentMetadata2 =
+        getSegmentZKMetadata("testTable__1__0__0", 5000, 50_000, TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2));
+
+    SegmentZKMetadata offlineTableSegmentMetadata =
+        getSegmentZKMetadata("testTable__0", 5000, 50_000, TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(offlineTableSegmentMetadata));
 
     MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
     generator.init(mockClusterInfoProvide);
 
-    // Skip task generation, if realtime table
-    TableConfig offlineTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
-    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    // Skip task generation, if the table is a realtime table and all segments are skipped
+    // We don't test realtime REFRESH table because this combination does not make sense
+    assertTrue(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.REALTIME,
+        Lists.newArrayList(realtimeTableSegmentMetadata1, realtimeTableSegmentMetadata2)).isEmpty());
+    TableConfig realtimeTableConfig = getTableConfig(TableType.REALTIME, new HashMap<>());
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
 
-    // Skip task generation, if REFRESH table
+    // Skip task generation, if the table is an offline REFRESH table
+    assertFalse(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.OFFLINE,
+        Lists.newArrayList(offlineTableSegmentMetadata)).isEmpty());
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", null));
-    offlineTableConfig = getOfflineTableConfig(new HashMap<>());
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, new HashMap<>());
     offlineTableConfig.setIngestionConfig(ingestionConfig);
     pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertTrue(pinotTaskConfigs.isEmpty());
@@ -165,7 +226,7 @@ public class MergeRollupTaskGeneratorTest {
     tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
     tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
-    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
     when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
         .thenReturn(Lists.newArrayList(Collections.emptyList()));
@@ -191,7 +252,7 @@ public class MergeRollupTaskGeneratorTest {
     tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
     tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
-    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
 
     String segmentName1 = "testTable__1";
@@ -221,7 +282,7 @@ public class MergeRollupTaskGeneratorTest {
     tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
     tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
-    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
 
     String segmentName1 = "testTable__1";
@@ -249,7 +310,7 @@ public class MergeRollupTaskGeneratorTest {
     tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
     tableTaskConfigs.put("daily.maxNumRecordsPerTask", "5000000");
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
-    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
 
     String segmentName1 = "testTable__1";
@@ -299,7 +360,7 @@ public class MergeRollupTaskGeneratorTest {
     tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
     tableTaskConfigs.put("daily.maxNumParallelBuckets", "3");
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
-    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
     ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
 
     String segmentName1 = "testTable__1";
@@ -504,7 +565,7 @@ public class MergeRollupTaskGeneratorTest {
     tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
     tableTaskConfigs.put("daily.maxNumRecordsPerTask", "5000000");
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
-    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
 
     String segmentName1 = "testTable__1";
     String segmentName2 = "testTable__2";
@@ -565,7 +626,7 @@ public class MergeRollupTaskGeneratorTest {
     tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
     tableTaskConfigs.put("daily.maxNumRecordsPerTask", "5000000");
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
-    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
 
     String segmentName1 = "testTable__1";
     String segmentName2 = "testTable__2";
@@ -649,7 +710,7 @@ public class MergeRollupTaskGeneratorTest {
     tableTaskConfigs.put("monthly.maxNumRecordsPerTask", "5000000");
 
     taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
-    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, taskConfigsMap);
 
     String segmentName1 = "testTable__1";
     String segmentName2 = "testTable__2";
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 44fbc7f5c3..a71ddb47f6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -313,6 +313,11 @@ public class TableConfig extends BaseJsonConfig {
     _dedupConfig = dedupConfig;
   }
 
+  @JsonIgnore
+  public boolean isDedupEnabled() {
+    return _dedupConfig != null && _dedupConfig.isDedupEnabled();
+  }
+
   @Nullable
   public DimensionTableConfig getDimensionTableConfig() {
     return _dimensionTableConfig;
@@ -347,6 +352,11 @@ public class TableConfig extends BaseJsonConfig {
     return _upsertConfig == null ? UpsertConfig.Mode.NONE : _upsertConfig.getMode();
   }
 
+  @JsonIgnore
+  public boolean isUpsertEnabled() {
+    return _upsertConfig != null && _upsertConfig.getMode() != UpsertConfig.Mode.NONE;
+  }
+
   @JsonIgnore
   @Nullable
   public String getUpsertComparisonColumn() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org