You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/05/28 06:21:13 UTC

[incubator-pinot] branch master updated: Add constants and metadata properties for MergeRollupTask: (#6932)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fb234a  Add constants and metadata properties for MergeRollupTask: (#6932)
4fb234a is described below

commit 4fb234ae89173bfda3e4ffd6686cb4df5d2a00b4
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Thu May 27 23:20:59 2021 -0700

    Add constants and metadata properties for MergeRollupTask: (#6932)
    
    1. Add setter/getter method for merge/rollup watermarks of each granularity.
    2. Add task config related constants in MinionConstants, create until functions to get rollup aggregation configs and merge constraints.
    3. Add MergeRollupTask.bucketGranularity constant which will be used in customMap of SegmentMetadata.
---
 .../apache/pinot/common/minion/Granularity.java    |  27 +++++
 .../common/minion/MergeRollupTaskMetadata.java     |  87 +++++++++++++++
 .../common/minion/MinionTaskMetadataUtils.java     |  28 +++++
 .../helix/core/minion/ClusterInfoAccessor.java     |  19 ++++
 .../apache/pinot/core/common/MinionConstants.java  |  19 +++-
 .../minion/rollup/MergeRollupSegmentConverter.java |   4 +
 .../minion/tasks/merge_rollup/MergeProperties.java |  50 +++++++++
 .../tasks/merge_rollup/MergeRollupTaskUtils.java   | 117 +++++++++++++++++++++
 .../merge_rollup/MergeRollupTaskUtilsTest.java     |  83 +++++++++++++++
 9 files changed, 432 insertions(+), 2 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/Granularity.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/Granularity.java
new file mode 100644
index 0000000..d6308e1
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/Granularity.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+public enum Granularity {
+  HOURLY,
+  DAILY,
+  WEEKLY,
+  MONTHLY,
+  YEARLY
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
new file mode 100644
index 0000000..583ebb4
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type <code>MergeRollupTask</code>.
+ * The <code>watermarkMap</code> denotes the time (exclusive) upto which tasks have been executed for the bucket granularity.
+ *
+ * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/MergeRollupTask/tableNameWithType
+ */
+public class MergeRollupTaskMetadata {
+
+  private static final String WATERMARK_KEY_PREFIX = "watermarkMs_";
+
+  private final String _tableNameWithType;
+  // Map from bucket granularity to its watermark
+  private final Map<Granularity, Long> _watermarkMap;
+
+  public MergeRollupTaskMetadata(String tableNameWithType, Map<Granularity, Long> watermarkMap) {
+    _tableNameWithType = tableNameWithType;
+    _watermarkMap = watermarkMap;
+  }
+
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
+  /**
+   * Get the watermarkMap in millis
+   */
+  public Map<Granularity, Long> getWatermarkMap() {
+    return _watermarkMap;
+  }
+
+  public static MergeRollupTaskMetadata fromZNRecord(ZNRecord znRecord) {
+    Map<Granularity, Long> watermarkMap = new HashMap<>();
+    Map<String, String> fields = znRecord.getSimpleFields();
+    for (Map.Entry<String, String> entry : fields.entrySet()) {
+      watermarkMap.put(Granularity.valueOf(entry.getKey().split(WATERMARK_KEY_PREFIX)[1]), Long.parseLong(entry.getValue()));
+    }
+    return new MergeRollupTaskMetadata(znRecord.getId(), watermarkMap);
+  }
+
+  public ZNRecord toZNRecord() {
+    ZNRecord znRecord = new ZNRecord(_tableNameWithType);
+    for (Map.Entry<Granularity, Long> entry : _watermarkMap.entrySet()) {
+      znRecord.setLongField(WATERMARK_KEY_PREFIX + entry.getKey().name(), entry.getValue());
+    }
+    return znRecord;
+  }
+
+  public String toJsonString() {
+    try {
+      return JsonUtils.objectToString(this);
+    } catch (JsonProcessingException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toJsonString();
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
index 43ac82c..a61636c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
@@ -52,6 +52,34 @@ public final class MinionTaskMetadataUtils {
   }
 
   /**
+   * Fetches the ZNRecord for MergeRollupTask for given tableNameWithType from MINION_TASK_METADATA/MergeRollupTask/tableNameWthType
+   * and converts it to a {@link MergeRollupTaskMetadata} object
+   */
+  @Nullable
+  public static MergeRollupTaskMetadata getMergeRollupTaskMetadata(
+      HelixPropertyStore<ZNRecord> propertyStore, String taskType, String tableNameWithType) {
+    ZNRecord znRecord = fetchMinionTaskMetadataZNRecord(propertyStore, taskType, tableNameWithType);
+    return znRecord != null ? MergeRollupTaskMetadata.fromZNRecord(znRecord) : null;
+  }
+
+  /**
+   * Persists the provided {@link MergeRollupTaskMetadata} to MINION_TASK_METADATA/MergeRollupTask/tableNameWthType.
+   * Will fail if expectedVersion does not match.
+   * Set expectedVersion -1 to override version check.
+   */
+  public static void persistMergeRollupTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore,
+      String taskType, MergeRollupTaskMetadata mergeRollupTaskMetadata,
+      int expectedVersion) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
+        mergeRollupTaskMetadata.getTableNameWithType());
+    if (!propertyStore
+        .set(path, mergeRollupTaskMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT)) {
+      throw new ZkException(
+          "Failed to persist minion MergeRollupTask metadata: " + mergeRollupTaskMetadata);
+    }
+  }
+
+  /**
    * Fetches the ZNRecord for RealtimeToOfflineSegmentsTask for given tableNameWithType from MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWthType
    * and converts it to a {@link RealtimeToOfflineSegmentsTaskMetadata} object
    */
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index d707cd9..40d2d95 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
 import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
 import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
 import org.apache.pinot.controller.ControllerConf;
@@ -111,6 +112,24 @@ public class ClusterInfoAccessor {
   }
 
   /**
+   * Fetches the {@link MergeRollupTaskMetadata} from MINION_TASK_METADATA for given table
+   * @param tableNameWithType table name with type
+   */
+  public MergeRollupTaskMetadata getMinionMergeRollupTaskMetadata(String tableNameWithType) {
+    return MinionTaskMetadataUtils.getMergeRollupTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+            MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType);
+  }
+
+  /**
+   * Sets the {@link MergeRollupTaskMetadata} into MINION_TASK_METADATA
+   * This call will override any previous metadata node
+   */
+  public void setMergeRollupTaskMetadata(MergeRollupTaskMetadata mergeRollupTaskMetadata) {
+    MinionTaskMetadataUtils.persistMergeRollupTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+        MinionConstants.MergeRollupTask.TASK_TYPE, mergeRollupTaskMetadata, -1);
+  }
+
+  /**
    * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from MINION_TASK_METADATA for given realtime table
    * @param tableNameWithType realtime table name
    */
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index c2ce030..2de938d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -23,11 +23,13 @@ public class MinionConstants {
   }
 
   public static final String TASK_TIME_SUFFIX = ".time";
+  public static final String TASK_BUCKET_GRANULARITY_SUFFIX = ".bucketGranularity";
 
   public static final String TABLE_NAME_KEY = "tableName";
   public static final String SEGMENT_NAME_KEY = "segmentName";
   public static final String DOWNLOAD_URL_KEY = "downloadURL";
   public static final String UPLOAD_URL_KEY = "uploadURL";
+  public static final String DOT_SEPARATOR = ".";
   public static final String URL_SEPARATOR = ",";
   public static final String AUTH_TOKEN = "authToken";
 
@@ -58,8 +60,21 @@ public class MinionConstants {
   }
 
   public static class MergeRollupTask {
-    public static final String TASK_TYPE = "mergeRollupTask";
-    public static final String MERGE_TYPE_KEY = "mergeTypeKey";
+    public static final String TASK_TYPE = "MergeRollupTask";
+
+    public static final String MERGE_TYPE_KEY = "mergeType";
+    public static final String GRANULARITY_KEY = "granularity";
+
+    // Rollup aggregate function related configs
+    public static final String AGGREGATE_KEY_PREFIX = "aggregate";
+
+    // Merge properties related configs
+    public static final String MERGE_KEY_PREFIX = "merge";
+    public static final String BUFFER_TIME = "bufferTime";
+    public static final String MAX_NUM_RECORDS_PER_SEGMENT = "maxNumRecordsPerSegment";
+    public static final String MAX_NUM_RECORDS_PER_TASK = "maxNumRecordsPerTask";
+
+    // Segment name generator related configs
     public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey";
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
index 518813e..06d41fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
@@ -35,6 +35,10 @@ import org.apache.pinot.spi.data.Schema;
 
 
 /**
+ * This class is deprecated.
+ *
+ * TODO: Update current MergeRollupExecutor to use SegmentProcessorFramework instead
+ *
  * Rollup segment converter takes a list of segments and concatenates/rolls up segments based on the configuration.
  *
  * TODO: Add support for roll-up with time granularity change
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java
new file mode 100644
index 0000000..34048e5
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.merge_rollup;
+
+public class MergeProperties {
+  private final String _mergeType;
+  private final long _bufferTimeMs;
+  private final long _maxNumRecordsPerSegment;
+  private final long _maxNumRecordsPerTask;
+
+  public MergeProperties(String mergeType, long bufferTimeMs, long maxNumRecordsPerSegment,
+      long maxNumRecordsPerTask) {
+    _mergeType = mergeType;
+    _bufferTimeMs = bufferTimeMs;
+    _maxNumRecordsPerSegment = maxNumRecordsPerSegment;
+    _maxNumRecordsPerTask = maxNumRecordsPerTask;
+  }
+
+  public String getMergeType() {
+    return _mergeType;
+  }
+
+  public long getBufferTimeMs() {
+    return _bufferTimeMs;
+  }
+
+  public long getMaxNumRecordsPerSegment() {
+    return _maxNumRecordsPerSegment;
+  }
+
+  public long getMaxNumRecordsPerTask() {
+    return _maxNumRecordsPerTask;
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
new file mode 100644
index 0000000..6a7c424
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.merge_rollup;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.minion.Granularity;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.pql.parsers.utils.Pair;
+import org.apache.pinot.spi.utils.TimeUtils;
+
+
+public class MergeRollupTaskUtils {
+
+  private static final String[] validMergeProperties = {
+      MinionConstants.MergeRollupTask.MERGE_TYPE_KEY,
+      MinionConstants.MergeRollupTask.BUFFER_TIME,
+      MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT,
+      MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_TASK
+  };
+
+  private static final String[] validMergeType = {
+      CollectorFactory.CollectorType.CONCAT.name(),
+      CollectorFactory.CollectorType.ROLLUP.name()
+  };
+
+  public static Map<String, ValueAggregatorFactory.ValueAggregatorType> getRollupAggregationTypeMap(
+      Map<String, String> mergeRollupConfig) {
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> rollupAggregationTypeMap = new HashMap<>();
+    for (Map.Entry<String, String> entry : mergeRollupConfig.entrySet()) {
+      if (entry.getKey().startsWith(MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX)) {
+        rollupAggregationTypeMap.put(getAggregateColumn(entry.getKey()),
+            ValueAggregatorFactory.ValueAggregatorType.valueOf(entry.getValue().toUpperCase()));
+      }
+    }
+    return rollupAggregationTypeMap;
+  }
+
+  public static Map<Granularity, MergeProperties> getAllMergeProperties(Map<String, String> mergeRollupConfig) {
+    Map<Granularity, Map<String, String>> mergePropertiesMap = new HashMap<>();
+    for (Map.Entry<String, String> entry : mergeRollupConfig.entrySet()) {
+      if (entry.getKey().startsWith(MinionConstants.MergeRollupTask.MERGE_KEY_PREFIX)) {
+        Pair<Granularity, String> pair = getGranularityAndPropertyPair(entry.getKey(), entry.getValue());
+        Granularity granularity = pair.getFirst();
+        String mergeProperty = pair.getSecond();
+        mergePropertiesMap.putIfAbsent(granularity, new HashMap<>());
+        mergePropertiesMap.get(granularity).put(mergeProperty, entry.getValue());
+      }
+    }
+
+    Map<Granularity, MergeProperties> allMergeProperties = new HashMap<>();
+    for (Map.Entry<Granularity, Map<String, String>> entry : mergePropertiesMap.entrySet()) {
+      Map<String, String> properties = entry.getValue();
+      MergeProperties mergeProperties = new MergeProperties(
+          properties.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY).toUpperCase(),
+          TimeUtils.convertPeriodToMillis(properties.get(MinionConstants.MergeRollupTask.BUFFER_TIME)),
+          Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT)),
+          Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_TASK)));
+      allMergeProperties.put(entry.getKey(), mergeProperties);
+    }
+    return allMergeProperties;
+  }
+
+  private static String getAggregateColumn(String rollupAggregateConfigKey) {
+    return rollupAggregateConfigKey.split(
+        MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX + MinionConstants.DOT_SEPARATOR)[1];
+  }
+
+  private static Pair<Granularity, String> getGranularityAndPropertyPair(
+      String mergePropertyConfigKey, String mergePropertyConfigValue) {
+    String[] components = StringUtils.split(mergePropertyConfigKey, MinionConstants.DOT_SEPARATOR);
+    Preconditions.checkState(components.length == 3);
+    Preconditions.checkState(isValidMergeProperties(components[2]));
+    if (components[2].equals(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY)) {
+      Preconditions.checkState(isValidMergeType(mergePropertyConfigValue));
+    }
+    return new Pair<>((Granularity.valueOf(components[1].toUpperCase())), components[2]);
+  }
+
+  private static boolean isValidMergeProperties(String property) {
+    for (String validProperty : validMergeProperties) {
+      if (property.equals(validProperty)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean isValidMergeType(String mergeType) {
+    for (String validMergeType : validMergeType) {
+      if (mergeType.toUpperCase().equals(validMergeType)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
new file mode 100644
index 0000000..6350415
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.merge_rollup;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.minion.Granularity;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class MergeRollupTaskUtilsTest {
+  private final String METRIC_COLUMN_A = "metricColA";
+  private final String METRIC_COLUMN_B = "metricColB";
+  private Map<String, String> _mergeRollupTaskConfig;
+
+  @BeforeClass
+  public void setUp() {
+    Map<String, String> mergeRollupTaskConfig = new HashMap<>();
+    mergeRollupTaskConfig.put("aggregate.metricColA", "sum");
+    mergeRollupTaskConfig.put("aggregate.metricColB", "max");
+    mergeRollupTaskConfig.put("merge.daily.mergeType", "concat");
+    mergeRollupTaskConfig.put("merge.daily.bufferTime", "2d");
+    mergeRollupTaskConfig.put("merge.daily.maxNumRecordsPerSegment", "1000000");
+    mergeRollupTaskConfig.put("merge.daily.maxNumRecordsPerTask", "5000000");
+    mergeRollupTaskConfig.put("merge.monthly.mergeType", "rollup");
+    mergeRollupTaskConfig.put("merge.monthly.bufferTime", "30d");
+    mergeRollupTaskConfig.put("merge.monthly.maxNumRecordsPerSegment", "2000000");
+    mergeRollupTaskConfig.put("merge.monthly.maxNumRecordsPerTask", "5000000");
+    _mergeRollupTaskConfig = mergeRollupTaskConfig;
+  }
+
+  @Test
+  public void testGetRollupAggregationTypeMap() {
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> rollupAggregationTypeMap =
+        MergeRollupTaskUtils.getRollupAggregationTypeMap(_mergeRollupTaskConfig);
+    Assert.assertEquals(rollupAggregationTypeMap.size(), 2);
+    Assert.assertTrue(rollupAggregationTypeMap.containsKey(METRIC_COLUMN_A));
+    Assert.assertTrue(rollupAggregationTypeMap.containsKey(METRIC_COLUMN_B));
+    Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_A), ValueAggregatorFactory.ValueAggregatorType.SUM);
+    Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_B), ValueAggregatorFactory.ValueAggregatorType.MAX);
+  }
+
+  @Test
+  public void testGetAllMergeProperties() {
+    Map<Granularity, MergeProperties> allMergeProperties =
+        MergeRollupTaskUtils.getAllMergeProperties(_mergeRollupTaskConfig);
+    Assert.assertEquals(allMergeProperties.size(), 2);
+    Assert.assertTrue(allMergeProperties.containsKey(Granularity.DAILY));
+    Assert.assertTrue(allMergeProperties.containsKey(Granularity.MONTHLY));
+
+    MergeProperties dailyProperty = allMergeProperties.get(Granularity.DAILY);
+    Assert.assertEquals(dailyProperty.getMergeType(), CollectorFactory.CollectorType.CONCAT.name());
+    Assert.assertEquals(dailyProperty.getBufferTimeMs(), 172800000L);
+    Assert.assertEquals(dailyProperty.getMaxNumRecordsPerSegment(), 1000000L);
+    Assert.assertEquals(dailyProperty.getMaxNumRecordsPerTask(), 5000000L);
+
+    MergeProperties monthlyProperty = allMergeProperties.get(Granularity.MONTHLY);
+    Assert.assertEquals(monthlyProperty.getMergeType(), CollectorFactory.CollectorType.ROLLUP.name());
+    Assert.assertEquals(monthlyProperty.getBufferTimeMs(), 2592000000L);
+    Assert.assertEquals(monthlyProperty.getMaxNumRecordsPerSegment(), 2000000L);
+    Assert.assertEquals(monthlyProperty.getMaxNumRecordsPerTask(), 5000000L);
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org