You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/21 03:26:27 UTC

[incubator-pinot] branch master updated: Integrate enhanced SegmentProcessorFramework into MergeRollupTaskExecutor (#7180)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e0163f4  Integrate enhanced SegmentProcessorFramework into MergeRollupTaskExecutor (#7180)
e0163f4 is described below

commit e0163f43a44b921faf62ca2066b3e6269dd8afa0
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jul 20 20:26:11 2021 -0700

    Integrate enhanced SegmentProcessorFramework into MergeRollupTaskExecutor (#7180)
    
    Wire the enhanced `SegmentProcessorFramework` into the merge/rollup task executor with the following support:
    - Concat/rollup/dedup
    - Null values
    - Custom segment name prefix
    
    Extract the common logic in `RealtimeToOfflineSegmentsTaskExecutor` and `MergeRollupTaskExecutor`
---
 .../apache/pinot/core/common/MinionConstants.java  |  52 +++----
 .../processing/framework/SegmentConfig.java        |   2 +-
 .../pinot/plugin/minion/tasks/MergeTaskUtils.java  | 147 ++++++++++++++++++
 .../minion/tasks/merge_rollup/MergeProperties.java |  50 ------
 .../merge_rollup/MergeRollupTaskExecutor.java      |  57 ++++---
 .../tasks/merge_rollup/MergeRollupTaskUtils.java   | 108 +++----------
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |  95 +++---------
 .../plugin/minion/tasks/MergeTaskUtilsTest.java    | 172 +++++++++++++++++++++
 .../merge_rollup/MergeRollupTaskExecutorTest.java  |   3 +-
 .../merge_rollup/MergeRollupTaskUtilsTest.java     |  86 ++++-------
 10 files changed, 449 insertions(+), 323 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index dba45e8..b328b13 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -23,7 +23,6 @@ public class MinionConstants {
   }
 
   public static final String TASK_TIME_SUFFIX = ".time";
-  public static final String TASK_BUCKET_GRANULARITY_SUFFIX = ".bucketGranularity";
 
   public static final String TABLE_NAME_KEY = "tableName";
   public static final String SEGMENT_NAME_KEY = "segmentName";
@@ -60,36 +59,15 @@ public class MinionConstants {
     public static final String TASK_TYPE = "PurgeTask";
   }
 
-  public static class MergeRollupTask {
-    public static final String TASK_TYPE = "MergeRollupTask";
-
-    public static final String MERGE_TYPE_KEY = "mergeType";
-    public static final String GRANULARITY_KEY = "granularity";
-
-    // Rollup aggregate function related configs
-    public static final String AGGREGATE_KEY_PREFIX = "aggregate";
-
-    // Merge properties related configs
-    public static final String MERGE_KEY_PREFIX = "merge";
-    public static final String BUFFER_TIME = "bufferTime";
-    public static final String MAX_NUM_RECORDS_PER_SEGMENT = "maxNumRecordsPerSegment";
-    public static final String MAX_NUM_RECORDS_PER_TASK = "maxNumRecordsPerTask";
-
-    // Segment name generator related configs
-    public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey";
-  }
-
-  /**
-   * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table
-   */
-  public static class RealtimeToOfflineSegmentsTask {
-    public static final String TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+  // Common config keys for segment merge tasks.
+  public static abstract class MergeTask {
 
     /**
      * The time window size for the task.
      * e.g. if set to "1d", then task is scheduled to run for a 1 day window
      */
     public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+
     /**
      * The time period to wait before picking segments for this task
      * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
@@ -100,15 +78,35 @@ public class MinionConstants {
     public static final String WINDOW_START_MS_KEY = "windowStartMs";
     public static final String WINDOW_END_MS_KEY = "windowEndMs";
     public static final String ROUND_BUCKET_TIME_PERIOD_KEY = "roundBucketTimePeriod";
+    public static final String PARTITION_BUCKET_TIME_PERIOD_KEY = "partitionBucketTimePeriod";
 
     // Merge config
     public static final String MERGE_TYPE_KEY = "mergeType";
-    @Deprecated // Replaced by MERGE_TYPE_KEY
-    public static final String COLLECTOR_TYPE_KEY = "collectorType";
     public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType";
 
     // Segment config
+    public static final String MAX_NUM_RECORDS_PER_TASK_KEY = "maxNumRecordsPerTask";
     public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";
+    public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix";
+  }
+
+  public static class MergeRollupTask extends MergeTask {
+    public static final String TASK_TYPE = "MergeRollupTask";
+
+    public static final String MERGE_LEVEL_KEY = "mergeLevel";
+
+    public static final String SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY = TASK_TYPE + "." + MERGE_LEVEL_KEY;
+    public static final String SEGMENT_ZK_METADATA_TIME_KEY = TASK_TYPE + TASK_TIME_SUFFIX;
+  }
+
+  /**
+   * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table
+   */
+  public static class RealtimeToOfflineSegmentsTask extends MergeTask {
+    public static final String TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+
+    @Deprecated // Replaced by MERGE_TYPE_KEY
+    public static final String COLLECTOR_TYPE_KEY = "collectorType";
   }
 
   // Generate segment and push to controller based on batch ingestion configs
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
index 40964c3..33aa896 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
@@ -30,7 +30,7 @@ import javax.annotation.Nullable;
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentConfig {
-  private static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+  public static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
 
   private final int _maxNumRecordsPerSegment;
   private final String _segmentNamePrefix;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
new file mode 100644
index 0000000..f6b3aec
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
+import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
+import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.TimeUtils;
+
+
+/**
+ * Common utils for segment merge tasks.
+ */
+public class MergeTaskUtils {
+  private MergeTaskUtils() {
+  }
+
+  private static final int AGGREGATION_TYPE_KEY_SUFFIX_LENGTH = MergeTask.AGGREGATION_TYPE_KEY_SUFFIX.length();
+
+  /**
+   * Creates the time handler config based on the given table config, schema and task config. Returns {@code null} if
+   * the table does not have a time column.
+   */
+  @Nullable
+  public static TimeHandlerConfig getTimeHandlerConfig(TableConfig tableConfig, Schema schema,
+      Map<String, String> taskConfig) {
+    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    if (timeColumn == null) {
+      return null;
+    }
+    DateTimeFieldSpec fieldSpec = schema.getSpecForTimeColumn(timeColumn);
+    Preconditions
+        .checkState(fieldSpec != null, "No valid spec found for time column: %s in schema for table: %s", timeColumn,
+            tableConfig.getTableName());
+
+    TimeHandlerConfig.Builder timeHandlerConfigBuilder = new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH);
+
+    String windowStartMs = taskConfig.get(MergeTask.WINDOW_START_MS_KEY);
+    String windowEndMs = taskConfig.get(MergeTask.WINDOW_END_MS_KEY);
+    if (windowStartMs != null && windowEndMs != null) {
+      timeHandlerConfigBuilder.setTimeRange(Long.parseLong(windowStartMs), Long.parseLong(windowEndMs));
+    }
+
+    String roundBucketTimePeriod = taskConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY);
+    if (roundBucketTimePeriod != null) {
+      timeHandlerConfigBuilder.setRoundBucketMs(TimeUtils.convertPeriodToMillis(roundBucketTimePeriod));
+    }
+
+    String partitionBucketTimePeriod = taskConfig.get(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY);
+    if (partitionBucketTimePeriod != null) {
+      timeHandlerConfigBuilder.setPartitionBucketMs(TimeUtils.convertPeriodToMillis(partitionBucketTimePeriod));
+    }
+
+    return timeHandlerConfigBuilder.build();
+  }
+
+  /**
+   * Creates the partitioner configs based on the given table config, schema and task config.
+   */
+  public static List<PartitionerConfig> getPartitionerConfigs(TableConfig tableConfig, Schema schema,
+      Map<String, String> taskConfig) {
+    SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+    if (segmentPartitionConfig == null) {
+      return Collections.emptyList();
+    }
+    Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+    Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot partition on multiple columns for table: %s",
+        tableConfig.getTableName());
+    Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator().next();
+    String partitionColumn = entry.getKey();
+    Preconditions.checkState(schema.hasColumn(partitionColumn),
+        "Partition column: %s does not exist in the schema for table: %s", partitionColumn, tableConfig.getTableName());
+    PartitionerConfig partitionerConfig =
+        new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
+            .setColumnName(partitionColumn).setColumnPartitionConfig(entry.getValue()).build();
+    return Collections.singletonList(partitionerConfig);
+  }
+
+  /**
+   * Returns the merge type based on the task config. Returns {@code null} if it is not configured.
+   */
+  @Nullable
+  public static MergeType getMergeType(Map<String, String> taskConfig) {
+    String mergeType = taskConfig.get(MergeTask.MERGE_TYPE_KEY);
+    return mergeType != null ? MergeType.valueOf(mergeType.toUpperCase()) : null;
+  }
+
+  /**
+   * Returns the map from column name to the aggregation type associated with it based on the task config.
+   */
+  public static Map<String, AggregationFunctionType> getAggregationTypes(Map<String, String> taskConfig) {
+    Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
+    for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
+      String key = entry.getKey();
+      if (key.endsWith(MergeTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
+        String column = key.substring(0, key.length() - AGGREGATION_TYPE_KEY_SUFFIX_LENGTH);
+        aggregationTypes.put(column, AggregationFunctionType.getAggregationFunctionType(entry.getValue()));
+      }
+    }
+    return aggregationTypes;
+  }
+
+  /**
+   * Returns the segment config based on the task config.
+   */
+  public static SegmentConfig getSegmentConfig(Map<String, String> taskConfig) {
+    SegmentConfig.Builder segmentConfigBuilder = new SegmentConfig.Builder();
+    String maxNumRecordsPerSegment = taskConfig.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
+    if (maxNumRecordsPerSegment != null) {
+      segmentConfigBuilder.setMaxNumRecordsPerSegment(Integer.parseInt(maxNumRecordsPerSegment));
+    }
+    segmentConfigBuilder.setSegmentNamePrefix(taskConfig.get(MergeTask.SEGMENT_NAME_PREFIX_KEY));
+    return segmentConfigBuilder.build();
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java
deleted file mode 100644
index 34048e5..0000000
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.merge_rollup;
-
-public class MergeProperties {
-  private final String _mergeType;
-  private final long _bufferTimeMs;
-  private final long _maxNumRecordsPerSegment;
-  private final long _maxNumRecordsPerTask;
-
-  public MergeProperties(String mergeType, long bufferTimeMs, long maxNumRecordsPerSegment,
-      long maxNumRecordsPerTask) {
-    _mergeType = mergeType;
-    _bufferTimeMs = bufferTimeMs;
-    _maxNumRecordsPerSegment = maxNumRecordsPerSegment;
-    _maxNumRecordsPerTask = maxNumRecordsPerTask;
-  }
-
-  public String getMergeType() {
-    return _mergeType;
-  }
-
-  public long getBufferTimeMs() {
-    return _bufferTimeMs;
-  }
-
-  public long getMaxNumRecordsPerSegment() {
-    return _maxNumRecordsPerSegment;
-  }
-
-  public long getMaxNumRecordsPerTask() {
-    return _maxNumRecordsPerTask;
-  }
-}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
index f607990..c1e7587 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
@@ -21,20 +21,19 @@ package org.apache.pinot.plugin.minion.tasks.merge_rollup;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.core.segment.processing.framework.MergeType;
-import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
 import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
@@ -42,13 +41,7 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Task executor that provides merge and rollup service
- *
- * TODO:
- *   1. Add the support for roll-up
- *   2. Add the support for time split to provide backfill support for merged segments
- *   3. Add merge/rollup name prefixes for generated segments
- *   4. Add the support for realtime table
+ * Task executor that provides merge and rollup service.
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
@@ -64,27 +57,29 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
     LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
     long startMillis = System.currentTimeMillis();
 
-    Preconditions.checkState(
-        MergeType.CONCAT.name().equalsIgnoreCase(configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY)),
-        "Only 'CONCAT' mode is currently supported.");
-
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     TableConfig tableConfig = getTableConfig(tableNameWithType);
     Schema schema = getSchema(tableNameWithType);
 
-    Map<String, AggregationFunctionType> aggregationTypes = MergeRollupTaskUtils.getRollupAggregationTypes(configs);
-    String numRecordsPerSegmentString = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
-
     SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
-        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
-            .setMergeType(MergeType.CONCAT);
-    if (!aggregationTypes.isEmpty()) {
-      segmentProcessorConfigBuilder.setAggregationTypes(aggregationTypes);
-    }
-    if (numRecordsPerSegmentString != null) {
-      segmentProcessorConfigBuilder.setSegmentConfig(
-          new SegmentConfig.Builder().setMaxNumRecordsPerSegment(Integer.parseInt(numRecordsPerSegmentString)).build());
-    }
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+    // Time handler config
+    segmentProcessorConfigBuilder
+        .setTimeHandlerConfig(MergeTaskUtils.getTimeHandlerConfig(tableConfig, schema, configs));
+
+    // Partitioner config
+    segmentProcessorConfigBuilder
+        .setPartitionerConfigs(MergeTaskUtils.getPartitionerConfigs(tableConfig, schema, configs));
+
+    // Merge type
+    segmentProcessorConfigBuilder.setMergeType(MergeTaskUtils.getMergeType(configs));
+
+    // Aggregation types
+    segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs));
+
+    // Segment config
+    segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
 
     SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
 
@@ -120,8 +115,10 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
   @Override
   protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
       SegmentConversionResult segmentConversionResult) {
-    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
-        .singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE + MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
-            pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY).toUpperCase()));
+    Map<String, String> updateMap = new TreeMap<>();
+    updateMap.put(MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
+        pinotTaskConfig.getConfigs().get(MergeRollupTask.MERGE_LEVEL_KEY));
+    updateMap.put(MergeRollupTask.SEGMENT_ZK_METADATA_TIME_KEY, String.valueOf(System.currentTimeMillis()));
+    return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap);
   }
 }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
index 85b72c3..79733a2 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
@@ -18,100 +18,38 @@
  */
 package org.apache.pinot.plugin.minion.tasks.merge_rollup;
 
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.minion.Granularity;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.segment.processing.framework.MergeType;
-import org.apache.pinot.pql.parsers.utils.Pair;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.spi.utils.TimeUtils;
+import java.util.TreeMap;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
 
 
 public class MergeRollupTaskUtils {
   //@formatter:off
-  private static final String[] validMergeProperties = {
-      MinionConstants.MergeRollupTask.MERGE_TYPE_KEY,
-      MinionConstants.MergeRollupTask.BUFFER_TIME,
-      MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT,
-      MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_TASK
-  };
-
-  private static final String[] validMergeType = {
-      MergeType.CONCAT.name(),
-      MergeType.ROLLUP.name()
+  private static final String[] VALID_CONFIG_KEYS = {
+      MergeTask.BUCKET_TIME_PERIOD_KEY,
+      MergeTask.BUFFER_TIME_PERIOD_KEY,
+      MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY,
+      MergeTask.MERGE_TYPE_KEY,
+      MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+      MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY
   };
   //@formatter:on
 
-  public static Map<String, AggregationFunctionType> getRollupAggregationTypes(Map<String, String> mergeRollupConfig) {
-    Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
-    for (Map.Entry<String, String> entry : mergeRollupConfig.entrySet()) {
-      if (entry.getKey().startsWith(MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX)) {
-        aggregationTypes.put(getAggregateColumn(entry.getKey()),
-            AggregationFunctionType.getAggregationFunctionType(entry.getValue()));
-      }
-    }
-    return aggregationTypes;
-  }
-
-  public static Map<Granularity, MergeProperties> getAllMergeProperties(Map<String, String> mergeRollupConfig) {
-    Map<Granularity, Map<String, String>> mergePropertiesMap = new HashMap<>();
-    for (Map.Entry<String, String> entry : mergeRollupConfig.entrySet()) {
-      if (entry.getKey().startsWith(MinionConstants.MergeRollupTask.MERGE_KEY_PREFIX)) {
-        Pair<Granularity, String> pair = getGranularityAndPropertyPair(entry.getKey(), entry.getValue());
-        Granularity granularity = pair.getFirst();
-        String mergeProperty = pair.getSecond();
-        mergePropertiesMap.putIfAbsent(granularity, new HashMap<>());
-        mergePropertiesMap.get(granularity).put(mergeProperty, entry.getValue());
-      }
-    }
-
-    Map<Granularity, MergeProperties> allMergeProperties = new HashMap<>();
-    for (Map.Entry<Granularity, Map<String, String>> entry : mergePropertiesMap.entrySet()) {
-      Map<String, String> properties = entry.getValue();
-      MergeProperties mergeProperties =
-          new MergeProperties(properties.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY).toUpperCase(),
-              TimeUtils.convertPeriodToMillis(properties.get(MinionConstants.MergeRollupTask.BUFFER_TIME)),
-              Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT)),
-              Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_TASK)));
-      allMergeProperties.put(entry.getKey(), mergeProperties);
-    }
-    return allMergeProperties;
-  }
-
-  private static String getAggregateColumn(String rollupAggregateConfigKey) {
-    return rollupAggregateConfigKey
-        .split(MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX + MinionConstants.DOT_SEPARATOR)[1];
-  }
-
-  private static Pair<Granularity, String> getGranularityAndPropertyPair(String mergePropertyConfigKey,
-      String mergePropertyConfigValue) {
-    String[] components = StringUtils.split(mergePropertyConfigKey, MinionConstants.DOT_SEPARATOR);
-    Preconditions.checkState(components.length == 3);
-    Preconditions.checkState(isValidMergeProperties(components[2]));
-    if (components[2].equals(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY)) {
-      Preconditions.checkState(isValidMergeType(mergePropertyConfigValue));
-    }
-    return new Pair<>((Granularity.valueOf(components[1].toUpperCase())), components[2]);
-  }
-
-  private static boolean isValidMergeProperties(String property) {
-    for (String validProperty : validMergeProperties) {
-      if (property.equals(validProperty)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private static boolean isValidMergeType(String mergeType) {
-    for (String validMergeType : validMergeType) {
-      if (mergeType.toUpperCase().equals(validMergeType)) {
-        return true;
+  /**
+   * Extracts a map from merge level to config from the task config.
+   * <p>The config for a specific level should have key of format "{level}.{configKey}" within the task config.
+   */
+  public static Map<String, Map<String, String>> getLevelToConfigMap(Map<String, String> taskConfig) {
+    Map<String, Map<String, String>> levelToConfigMap = new TreeMap<>();
+    for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
+      String key = entry.getKey();
+      for (String configKey : VALID_CONFIG_KEYS) {
+        if (key.endsWith(configKey)) {
+          String level = key.substring(0, key.length() - configKey.length() - 1);
+          levelToConfigMap.computeIfAbsent(level, k -> new TreeMap<>()).put(configKey, entry.getValue());
+        }
       }
     }
-    return false;
+    return levelToConfigMap;
   }
 }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
index 13802d0..2a24937 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -19,11 +19,9 @@
 package org.apache.pinot.plugin.minion.tasks.realtime_to_offline_segments;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
@@ -34,23 +32,14 @@ import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.segment.processing.framework.MergeType;
-import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
-import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
-import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
-import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
-import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,7 +72,6 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
 
   private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
   private int _expectedVersion = Integer.MIN_VALUE;
-  private long _nextWatermark;
 
   public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
     _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
@@ -130,63 +118,34 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     TableConfig tableConfig = getTableConfig(offlineTableName);
     Schema schema = getSchema(offlineTableName);
-    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
-    DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumn);
-    Preconditions
-        .checkState(dateTimeFieldSpec != null, "No valid spec found for time column: %s in schema for table: %s",
-            timeColumn, offlineTableName);
+
     SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
         new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
 
-    long windowStartMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
-    long windowEndMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
-    _nextWatermark = windowEndMs;
-
     // Time handler config
-    TimeHandlerConfig.Builder timeHandlerConfigBuilder =
-        new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(windowStartMs, windowEndMs);
-    String roundBucketTimePeriod = configs.get(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY);
-    if (roundBucketTimePeriod != null) {
-      timeHandlerConfigBuilder.setRoundBucketMs(TimeUtils.convertPeriodToMillis(roundBucketTimePeriod));
-    }
-    segmentProcessorConfigBuilder.setTimeHandlerConfig(timeHandlerConfigBuilder.build());
+    segmentProcessorConfigBuilder
+        .setTimeHandlerConfig(MergeTaskUtils.getTimeHandlerConfig(tableConfig, schema, configs));
 
-    // Partitioner config from tableConfig
-    SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-    if (segmentPartitionConfig != null) {
-      PartitionerConfig partitionerConfig = getPartitionerConfig(segmentPartitionConfig, offlineTableName, schema);
-      segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
-    }
+    // Partitioner config
+    segmentProcessorConfigBuilder
+        .setPartitionerConfigs(MergeTaskUtils.getPartitionerConfigs(tableConfig, schema, configs));
 
     // Merge type
-    String mergeType = configs.get(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY);
-    // Handle deprecated key
+    MergeType mergeType = MergeTaskUtils.getMergeType(configs);
+    // Handle legacy key
     if (mergeType == null) {
-      mergeType = configs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
-    }
-    if (mergeType != null) {
-      segmentProcessorConfigBuilder.setMergeType(MergeType.valueOf(mergeType.toUpperCase()));
+      String legacyMergeTypeStr = configs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
+      if (legacyMergeTypeStr != null) {
+        mergeType = MergeType.valueOf(legacyMergeTypeStr.toUpperCase());
+      }
     }
+    segmentProcessorConfigBuilder.setMergeType(mergeType);
 
     // Aggregation types
-    Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
-    for (Map.Entry<String, String> entry : configs.entrySet()) {
-      String key = entry.getKey();
-      if (key.endsWith(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
-        String column = key.split(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)[0];
-        aggregationTypes.put(column, AggregationFunctionType.getAggregationFunctionType(entry.getValue()));
-      }
-    }
-    if (!aggregationTypes.isEmpty()) {
-      segmentProcessorConfigBuilder.setAggregationTypes(aggregationTypes);
-    }
+    segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs));
 
     // Segment config
-    String maxNumRecordsPerSegment = configs.get(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
-    if (maxNumRecordsPerSegment != null) {
-      segmentProcessorConfigBuilder.setSegmentConfig(
-          new SegmentConfig.Builder().setMaxNumRecordsPerSegment(Integer.parseInt(maxNumRecordsPerSegment)).build());
-    }
+    segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
 
     SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
 
@@ -226,9 +185,11 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
    */
   @Override
   public void postProcess(PinotTaskConfig pinotTaskConfig) {
-    String realtimeTableName = pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY);
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
+    long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
     RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
-        new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, _nextWatermark);
+        new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs);
     _minionTaskZkMetadataManager.setRealtimeToOfflineSegmentsTaskMetadata(newMinionMetadata, _expectedVersion);
   }
 
@@ -238,20 +199,4 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
     return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
         Collections.emptyMap());
   }
-
-  /**
-   * Construct a {@link PartitionerConfig} using {@link SegmentPartitionConfig} from the table config
-   */
-  private PartitionerConfig getPartitionerConfig(SegmentPartitionConfig partitionConfig, String tableNameWithType,
-      Schema schema) {
-    Map<String, ColumnPartitionConfig> columnPartitionMap = partitionConfig.getColumnPartitionMap();
-    Preconditions.checkState(columnPartitionMap.size() == 1,
-        "Cannot partition using more than 1 ColumnPartitionConfig for table: %s", tableNameWithType);
-    Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator().next();
-    String partitionColumn = entry.getKey();
-    Preconditions.checkState(schema.hasColumn(partitionColumn),
-        "Partition column: %s does not exist in the schema for table: %s", partitionColumn, tableNameWithType);
-    return new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
-        .setColumnName(partitionColumn).setColumnPartitionConfig(entry.getValue()).build();
-  }
 }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
new file mode 100644
index 0000000..0139392
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
+import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
+import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class MergeTaskUtilsTest {
+
+  @Test
+  public void testGetTimeHandlerConfig() {
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("dateTime").build();
+    Schema schema = new Schema.SchemaBuilder()
+        .addDateTime("dateTime", DataType.LONG, "1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss", "1:SECONDS").build();
+    Map<String, String> taskConfig = new HashMap<>();
+    long expectedWindowStartMs = 1625097600000L;
+    long expectedWindowEndMs = 1625184000000L;
+    taskConfig.put(MergeTask.WINDOW_START_MS_KEY, Long.toString(expectedWindowStartMs));
+    taskConfig.put(MergeTask.WINDOW_END_MS_KEY, Long.toString(expectedWindowEndMs));
+    long expectedRoundBucketMs = 6 * 3600 * 1000;
+    taskConfig.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, "6h");
+    long expectedPartitionBucketMs = 24 * 3600 * 1000;
+    taskConfig.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, "1d");
+
+    TimeHandlerConfig timeHandlerConfig = MergeTaskUtils.getTimeHandlerConfig(tableConfig, schema, taskConfig);
+    assertNotNull(timeHandlerConfig);
+    assertEquals(timeHandlerConfig.getType(), TimeHandler.Type.EPOCH);
+    assertEquals(timeHandlerConfig.getStartTimeMs(), expectedWindowStartMs);
+    assertEquals(timeHandlerConfig.getEndTimeMs(), expectedWindowEndMs);
+    assertEquals(timeHandlerConfig.getRoundBucketMs(), expectedRoundBucketMs);
+    assertEquals(timeHandlerConfig.getPartitionBucketMs(), expectedPartitionBucketMs);
+
+    // No time column in table config
+    TableConfig tableConfigWithoutTimeColumn =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
+    assertNull(MergeTaskUtils.getTimeHandlerConfig(tableConfigWithoutTimeColumn, schema, taskConfig));
+
+    // Time column does not exist in schema
+    Schema schemaWithoutTimeColumn = new Schema.SchemaBuilder().build();
+    try {
+      MergeTaskUtils.getTimeHandlerConfig(tableConfig, schemaWithoutTimeColumn, taskConfig);
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testGetPartitionerConfigs() {
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable")
+        .setSegmentPartitionConfig(
+            new SegmentPartitionConfig(Collections.singletonMap("memberId", new ColumnPartitionConfig("murmur", 10))))
+        .build();
+    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("memberId", DataType.LONG).build();
+    Map<String, String> taskConfig = Collections.emptyMap();
+
+    List<PartitionerConfig> partitionerConfigs = MergeTaskUtils.getPartitionerConfigs(tableConfig, schema, taskConfig);
+    assertEquals(partitionerConfigs.size(), 1);
+    PartitionerConfig partitionerConfig = partitionerConfigs.get(0);
+    assertEquals(partitionerConfig.getPartitionerType(), PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG);
+    assertEquals(partitionerConfig.getColumnName(), "memberId");
+    ColumnPartitionConfig columnPartitionConfig = partitionerConfig.getColumnPartitionConfig();
+    assertEquals(columnPartitionConfig.getFunctionName(), "murmur");
+    assertEquals(columnPartitionConfig.getNumPartitions(), 10);
+
+    // No partition column in table config
+    TableConfig tableConfigWithoutPartitionColumn =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
+    assertTrue(MergeTaskUtils.getPartitionerConfigs(tableConfigWithoutPartitionColumn, schema, taskConfig).isEmpty());
+
+    // Partition column does not exist in schema
+    Schema schemaWithoutPartitionColumn = new Schema.SchemaBuilder().build();
+    try {
+      MergeTaskUtils.getPartitionerConfigs(tableConfig, schemaWithoutPartitionColumn, taskConfig);
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testGetMergeType() {
+    assertEquals(MergeTaskUtils.getMergeType(Collections.singletonMap(MergeTask.MERGE_TYPE_KEY, "concat")),
+        MergeType.CONCAT);
+    assertEquals(MergeTaskUtils.getMergeType(Collections.singletonMap(MergeTask.MERGE_TYPE_KEY, "Rollup")),
+        MergeType.ROLLUP);
+    assertEquals(MergeTaskUtils.getMergeType(Collections.singletonMap(MergeTask.MERGE_TYPE_KEY, "DeDuP")),
+        MergeType.DEDUP);
+    assertNull(MergeTaskUtils.getMergeType(Collections.emptyMap()));
+
+    try {
+      MergeTaskUtils.getMergeType(Collections.singletonMap(MergeTask.MERGE_TYPE_KEY, "unsupported"));
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testGetAggregationTypes() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put("colA.aggregationType", "sum");
+    taskConfig.put("colB.aggregationType", "Min");
+    taskConfig.put("colC.aggregationType", "MaX");
+
+    Map<String, AggregationFunctionType> aggregationTypes = MergeTaskUtils.getAggregationTypes(taskConfig);
+    assertEquals(aggregationTypes.size(), 3);
+    assertEquals(aggregationTypes.get("colA"), AggregationFunctionType.SUM);
+    assertEquals(aggregationTypes.get("colB"), AggregationFunctionType.MIN);
+    assertEquals(aggregationTypes.get("colC"), AggregationFunctionType.MAX);
+
+    taskConfig.put("colD.aggregationType", "unsupported");
+    try {
+      MergeTaskUtils.getAggregationTypes(taskConfig);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testGetSegmentConfig() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, "10000");
+    taskConfig.put(MergeTask.SEGMENT_NAME_PREFIX_KEY, "myPrefix");
+    SegmentConfig segmentConfig = MergeTaskUtils.getSegmentConfig(taskConfig);
+    assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), 10000);
+    assertEquals(segmentConfig.getSegmentNamePrefix(), "myPrefix");
+
+    segmentConfig = MergeTaskUtils.getSegmentConfig(Collections.emptyMap());
+    assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), SegmentConfig.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT);
+    assertNull(segmentConfig.getSegmentNamePrefix());
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutorTest.java
index 27a89e3..5cf3afc 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutorTest.java
@@ -106,9 +106,8 @@ public class MergeRollupTaskExecutorTest {
       throws Exception {
     MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor();
     Map<String, String> configs = new HashMap<>();
-    configs.put(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY, "CONCAT");
-    configs.put(MinionConstants.MergeRollupTask.GRANULARITY_KEY, "Daily");
     configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
+    configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
 
     PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
     List<SegmentConversionResult> conversionResults =
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
index 6058bc0..864e57d 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
@@ -20,64 +20,44 @@ package org.apache.pinot.plugin.minion.tasks.merge_rollup;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.common.minion.Granularity;
-import org.apache.pinot.core.segment.processing.framework.MergeType;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
-public class MergeRollupTaskUtilsTest {
-  private final String METRIC_COLUMN_A = "metricColA";
-  private final String METRIC_COLUMN_B = "metricColB";
-  private Map<String, String> _mergeRollupTaskConfig;
-
-  @BeforeClass
-  public void setUp() {
-    Map<String, String> mergeRollupTaskConfig = new HashMap<>();
-    mergeRollupTaskConfig.put("aggregate.metricColA", "sum");
-    mergeRollupTaskConfig.put("aggregate.metricColB", "max");
-    mergeRollupTaskConfig.put("merge.daily.mergeType", "concat");
-    mergeRollupTaskConfig.put("merge.daily.bufferTime", "2d");
-    mergeRollupTaskConfig.put("merge.daily.maxNumRecordsPerSegment", "1000000");
-    mergeRollupTaskConfig.put("merge.daily.maxNumRecordsPerTask", "5000000");
-    mergeRollupTaskConfig.put("merge.monthly.mergeType", "rollup");
-    mergeRollupTaskConfig.put("merge.monthly.bufferTime", "30d");
-    mergeRollupTaskConfig.put("merge.monthly.maxNumRecordsPerSegment", "2000000");
-    mergeRollupTaskConfig.put("merge.monthly.maxNumRecordsPerTask", "5000000");
-    _mergeRollupTaskConfig = mergeRollupTaskConfig;
-  }
 
-  @Test
-  public void testGetRollupAggregationTypeMap() {
-    Map<String, AggregationFunctionType> rollupAggregationTypeMap =
-        MergeRollupTaskUtils.getRollupAggregationTypes(_mergeRollupTaskConfig);
-    Assert.assertEquals(rollupAggregationTypeMap.size(), 2);
-    Assert.assertTrue(rollupAggregationTypeMap.containsKey(METRIC_COLUMN_A));
-    Assert.assertTrue(rollupAggregationTypeMap.containsKey(METRIC_COLUMN_B));
-    Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_A), AggregationFunctionType.SUM);
-    Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_B), AggregationFunctionType.MAX);
-  }
+public class MergeRollupTaskUtilsTest {
 
   @Test
-  public void testGetAllMergeProperties() {
-    Map<Granularity, MergeProperties> allMergeProperties =
-        MergeRollupTaskUtils.getAllMergeProperties(_mergeRollupTaskConfig);
-    Assert.assertEquals(allMergeProperties.size(), 2);
-    Assert.assertTrue(allMergeProperties.containsKey(Granularity.DAILY));
-    Assert.assertTrue(allMergeProperties.containsKey(Granularity.MONTHLY));
-
-    MergeProperties dailyProperty = allMergeProperties.get(Granularity.DAILY);
-    Assert.assertEquals(dailyProperty.getMergeType(), MergeType.CONCAT.name());
-    Assert.assertEquals(dailyProperty.getBufferTimeMs(), 172800000L);
-    Assert.assertEquals(dailyProperty.getMaxNumRecordsPerSegment(), 1000000L);
-    Assert.assertEquals(dailyProperty.getMaxNumRecordsPerTask(), 5000000L);
-
-    MergeProperties monthlyProperty = allMergeProperties.get(Granularity.MONTHLY);
-    Assert.assertEquals(monthlyProperty.getMergeType(), MergeType.ROLLUP.name());
-    Assert.assertEquals(monthlyProperty.getBufferTimeMs(), 2592000000L);
-    Assert.assertEquals(monthlyProperty.getMaxNumRecordsPerSegment(), 2000000L);
-    Assert.assertEquals(monthlyProperty.getMaxNumRecordsPerTask(), 5000000L);
+  public void testGetLevelToConfigMap() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put("daily.bucketTimePeriod", "1d");
+    taskConfig.put("daily.bufferTimePeriod", "3d");
+    taskConfig.put("daily.maxNumRecordsPerSegment", "1000000");
+    taskConfig.put("monthly.bucketTimePeriod", "30d");
+    taskConfig.put("monthly.bufferTimePeriod", "10d");
+    taskConfig.put("monthly.roundBucketTimePeriod", "7d");
+    taskConfig.put("monthly.mergeType", "rollup");
+    taskConfig.put("monthly.maxNumRecordsPerTask", "5000000");
+
+    Map<String, Map<String, String>> levelToConfigMap = MergeRollupTaskUtils.getLevelToConfigMap(taskConfig);
+    assertEquals(levelToConfigMap.size(), 2);
+
+    Map<String, String> dailyConfig = levelToConfigMap.get("daily");
+    assertNotNull(dailyConfig);
+    assertEquals(dailyConfig.size(), 3);
+    assertEquals(dailyConfig.get(MergeTask.BUCKET_TIME_PERIOD_KEY), "1d");
+    assertEquals(dailyConfig.get(MergeTask.BUFFER_TIME_PERIOD_KEY), "3d");
+    assertEquals(dailyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY), "1000000");
+
+    Map<String, String> monthlyConfig = levelToConfigMap.get("monthly");
+    assertNotNull(monthlyConfig);
+    assertEquals(monthlyConfig.size(), 5);
+    assertEquals(monthlyConfig.get(MergeTask.BUCKET_TIME_PERIOD_KEY), "30d");
+    assertEquals(monthlyConfig.get(MergeTask.BUFFER_TIME_PERIOD_KEY), "10d");
+    assertEquals(monthlyConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY), "7d");
+    assertEquals(monthlyConfig.get(MergeTask.MERGE_TYPE_KEY), "rollup");
+    assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY), "5000000");
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org