You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/21 03:26:27 UTC
[incubator-pinot] branch master updated: Integrate enhanced
SegmentProcessorFramework into MergeRollupTaskExecutor (#7180)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e0163f4 Integrate enhanced SegmentProcessorFramework into MergeRollupTaskExecutor (#7180)
e0163f4 is described below
commit e0163f43a44b921faf62ca2066b3e6269dd8afa0
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jul 20 20:26:11 2021 -0700
Integrate enhanced SegmentProcessorFramework into MergeRollupTaskExecutor (#7180)
Wire the enhanced `SegmentProcessorFramework` into the merge/rollup task executor with the following support:
- Concat/rollup/dedup
- Null values
- Custom segment name prefix
Extract the common logic in `RealtimeToOfflineSegmentsTaskExecutor` and `MergeRollupTaskExecutor`
---
.../apache/pinot/core/common/MinionConstants.java | 52 +++----
.../processing/framework/SegmentConfig.java | 2 +-
.../pinot/plugin/minion/tasks/MergeTaskUtils.java | 147 ++++++++++++++++++
.../minion/tasks/merge_rollup/MergeProperties.java | 50 ------
.../merge_rollup/MergeRollupTaskExecutor.java | 57 ++++---
.../tasks/merge_rollup/MergeRollupTaskUtils.java | 108 +++----------
.../RealtimeToOfflineSegmentsTaskExecutor.java | 95 +++---------
.../plugin/minion/tasks/MergeTaskUtilsTest.java | 172 +++++++++++++++++++++
.../merge_rollup/MergeRollupTaskExecutorTest.java | 3 +-
.../merge_rollup/MergeRollupTaskUtilsTest.java | 86 ++++-------
10 files changed, 449 insertions(+), 323 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index dba45e8..b328b13 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -23,7 +23,6 @@ public class MinionConstants {
}
public static final String TASK_TIME_SUFFIX = ".time";
- public static final String TASK_BUCKET_GRANULARITY_SUFFIX = ".bucketGranularity";
public static final String TABLE_NAME_KEY = "tableName";
public static final String SEGMENT_NAME_KEY = "segmentName";
@@ -60,36 +59,15 @@ public class MinionConstants {
public static final String TASK_TYPE = "PurgeTask";
}
- public static class MergeRollupTask {
- public static final String TASK_TYPE = "MergeRollupTask";
-
- public static final String MERGE_TYPE_KEY = "mergeType";
- public static final String GRANULARITY_KEY = "granularity";
-
- // Rollup aggregate function related configs
- public static final String AGGREGATE_KEY_PREFIX = "aggregate";
-
- // Merge properties related configs
- public static final String MERGE_KEY_PREFIX = "merge";
- public static final String BUFFER_TIME = "bufferTime";
- public static final String MAX_NUM_RECORDS_PER_SEGMENT = "maxNumRecordsPerSegment";
- public static final String MAX_NUM_RECORDS_PER_TASK = "maxNumRecordsPerTask";
-
- // Segment name generator related configs
- public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey";
- }
-
- /**
- * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table
- */
- public static class RealtimeToOfflineSegmentsTask {
- public static final String TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+ // Common config keys for segment merge tasks.
+ public static abstract class MergeTask {
/**
* The time window size for the task.
* e.g. if set to "1d", then task is scheduled to run for a 1 day window
*/
public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+
/**
* The time period to wait before picking segments for this task
* e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
@@ -100,15 +78,35 @@ public class MinionConstants {
public static final String WINDOW_START_MS_KEY = "windowStartMs";
public static final String WINDOW_END_MS_KEY = "windowEndMs";
public static final String ROUND_BUCKET_TIME_PERIOD_KEY = "roundBucketTimePeriod";
+ public static final String PARTITION_BUCKET_TIME_PERIOD_KEY = "partitionBucketTimePeriod";
// Merge config
public static final String MERGE_TYPE_KEY = "mergeType";
- @Deprecated // Replaced by MERGE_TYPE_KEY
- public static final String COLLECTOR_TYPE_KEY = "collectorType";
public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType";
// Segment config
+ public static final String MAX_NUM_RECORDS_PER_TASK_KEY = "maxNumRecordsPerTask";
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";
+ public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix";
+ }
+
+ public static class MergeRollupTask extends MergeTask {
+ public static final String TASK_TYPE = "MergeRollupTask";
+
+ public static final String MERGE_LEVEL_KEY = "mergeLevel";
+
+ public static final String SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY = TASK_TYPE + "." + MERGE_LEVEL_KEY;
+ public static final String SEGMENT_ZK_METADATA_TIME_KEY = TASK_TYPE + TASK_TIME_SUFFIX;
+ }
+
+ /**
+ * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table
+ */
+ public static class RealtimeToOfflineSegmentsTask extends MergeTask {
+ public static final String TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+
+ @Deprecated // Replaced by MERGE_TYPE_KEY
+ public static final String COLLECTOR_TYPE_KEY = "collectorType";
}
// Generate segment and push to controller based on batch ingestion configs
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
index 40964c3..33aa896 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
@@ -30,7 +30,7 @@ import javax.annotation.Nullable;
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class SegmentConfig {
- private static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+ public static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
private final int _maxNumRecordsPerSegment;
private final String _segmentNamePrefix;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
new file mode 100644
index 0000000..f6b3aec
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
+import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
+import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.TimeUtils;
+
+
+/**
+ * Common utils for segment merge tasks.
+ */
+public class MergeTaskUtils {
+ private MergeTaskUtils() {
+ }
+
+ private static final int AGGREGATION_TYPE_KEY_SUFFIX_LENGTH = MergeTask.AGGREGATION_TYPE_KEY_SUFFIX.length();
+
+ /**
+ * Creates the time handler config based on the given table config, schema and task config. Returns {@code null} if
+ * the table does not have a time column.
+ */
+ @Nullable
+ public static TimeHandlerConfig getTimeHandlerConfig(TableConfig tableConfig, Schema schema,
+ Map<String, String> taskConfig) {
+ String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+ if (timeColumn == null) {
+ return null;
+ }
+ DateTimeFieldSpec fieldSpec = schema.getSpecForTimeColumn(timeColumn);
+ Preconditions
+ .checkState(fieldSpec != null, "No valid spec found for time column: %s in schema for table: %s", timeColumn,
+ tableConfig.getTableName());
+
+ TimeHandlerConfig.Builder timeHandlerConfigBuilder = new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH);
+
+ String windowStartMs = taskConfig.get(MergeTask.WINDOW_START_MS_KEY);
+ String windowEndMs = taskConfig.get(MergeTask.WINDOW_END_MS_KEY);
+ if (windowStartMs != null && windowEndMs != null) {
+ timeHandlerConfigBuilder.setTimeRange(Long.parseLong(windowStartMs), Long.parseLong(windowEndMs));
+ }
+
+ String roundBucketTimePeriod = taskConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY);
+ if (roundBucketTimePeriod != null) {
+ timeHandlerConfigBuilder.setRoundBucketMs(TimeUtils.convertPeriodToMillis(roundBucketTimePeriod));
+ }
+
+ String partitionBucketTimePeriod = taskConfig.get(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY);
+ if (partitionBucketTimePeriod != null) {
+ timeHandlerConfigBuilder.setPartitionBucketMs(TimeUtils.convertPeriodToMillis(partitionBucketTimePeriod));
+ }
+
+ return timeHandlerConfigBuilder.build();
+ }
+
+ /**
+ * Creates the partitioner configs based on the given table config, schema and task config.
+ */
+ public static List<PartitionerConfig> getPartitionerConfigs(TableConfig tableConfig, Schema schema,
+ Map<String, String> taskConfig) {
+ SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+ if (segmentPartitionConfig == null) {
+ return Collections.emptyList();
+ }
+ Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
+ Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot partition on multiple columns for table: %s",
+ tableConfig.getTableName());
+ Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator().next();
+ String partitionColumn = entry.getKey();
+ Preconditions.checkState(schema.hasColumn(partitionColumn),
+ "Partition column: %s does not exist in the schema for table: %s", partitionColumn, tableConfig.getTableName());
+ PartitionerConfig partitionerConfig =
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
+ .setColumnName(partitionColumn).setColumnPartitionConfig(entry.getValue()).build();
+ return Collections.singletonList(partitionerConfig);
+ }
+
+ /**
+ * Returns the merge type based on the task config. Returns {@code null} if it is not configured.
+ */
+ @Nullable
+ public static MergeType getMergeType(Map<String, String> taskConfig) {
+ String mergeType = taskConfig.get(MergeTask.MERGE_TYPE_KEY);
+ return mergeType != null ? MergeType.valueOf(mergeType.toUpperCase()) : null;
+ }
+
+ /**
+ * Returns the map from column name to the aggregation type associated with it based on the task config.
+ */
+ public static Map<String, AggregationFunctionType> getAggregationTypes(Map<String, String> taskConfig) {
+ Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
+ for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
+ String key = entry.getKey();
+ if (key.endsWith(MergeTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
+ String column = key.substring(0, key.length() - AGGREGATION_TYPE_KEY_SUFFIX_LENGTH);
+ aggregationTypes.put(column, AggregationFunctionType.getAggregationFunctionType(entry.getValue()));
+ }
+ }
+ return aggregationTypes;
+ }
+
+ /**
+ * Returns the segment config based on the task config.
+ */
+ public static SegmentConfig getSegmentConfig(Map<String, String> taskConfig) {
+ SegmentConfig.Builder segmentConfigBuilder = new SegmentConfig.Builder();
+ String maxNumRecordsPerSegment = taskConfig.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
+ if (maxNumRecordsPerSegment != null) {
+ segmentConfigBuilder.setMaxNumRecordsPerSegment(Integer.parseInt(maxNumRecordsPerSegment));
+ }
+ segmentConfigBuilder.setSegmentNamePrefix(taskConfig.get(MergeTask.SEGMENT_NAME_PREFIX_KEY));
+ return segmentConfigBuilder.build();
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java
deleted file mode 100644
index 34048e5..0000000
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeProperties.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.minion.tasks.merge_rollup;
-
-public class MergeProperties {
- private final String _mergeType;
- private final long _bufferTimeMs;
- private final long _maxNumRecordsPerSegment;
- private final long _maxNumRecordsPerTask;
-
- public MergeProperties(String mergeType, long bufferTimeMs, long maxNumRecordsPerSegment,
- long maxNumRecordsPerTask) {
- _mergeType = mergeType;
- _bufferTimeMs = bufferTimeMs;
- _maxNumRecordsPerSegment = maxNumRecordsPerSegment;
- _maxNumRecordsPerTask = maxNumRecordsPerTask;
- }
-
- public String getMergeType() {
- return _mergeType;
- }
-
- public long getBufferTimeMs() {
- return _bufferTimeMs;
- }
-
- public long getMaxNumRecordsPerSegment() {
- return _maxNumRecordsPerSegment;
- }
-
- public long getMaxNumRecordsPerTask() {
- return _maxNumRecordsPerTask;
- }
-}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
index f607990..c1e7587 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
@@ -21,20 +21,19 @@ package org.apache.pinot.plugin.minion.tasks.merge_rollup;
import com.google.common.base.Preconditions;
import java.io.File;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.core.segment.processing.framework.MergeType;
-import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
@@ -42,13 +41,7 @@ import org.slf4j.LoggerFactory;
/**
- * Task executor that provides merge and rollup service
- *
- * TODO:
- * 1. Add the support for roll-up
- * 2. Add the support for time split to provide backfill support for merged segments
- * 3. Add merge/rollup name prefixes for generated segments
- * 4. Add the support for realtime table
+ * Task executor that provides merge and rollup service.
*/
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
@@ -64,27 +57,29 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
long startMillis = System.currentTimeMillis();
- Preconditions.checkState(
- MergeType.CONCAT.name().equalsIgnoreCase(configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY)),
- "Only 'CONCAT' mode is currently supported.");
-
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
TableConfig tableConfig = getTableConfig(tableNameWithType);
Schema schema = getSchema(tableNameWithType);
- Map<String, AggregationFunctionType> aggregationTypes = MergeRollupTaskUtils.getRollupAggregationTypes(configs);
- String numRecordsPerSegmentString = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
-
SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
- new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
- .setMergeType(MergeType.CONCAT);
- if (!aggregationTypes.isEmpty()) {
- segmentProcessorConfigBuilder.setAggregationTypes(aggregationTypes);
- }
- if (numRecordsPerSegmentString != null) {
- segmentProcessorConfigBuilder.setSegmentConfig(
- new SegmentConfig.Builder().setMaxNumRecordsPerSegment(Integer.parseInt(numRecordsPerSegmentString)).build());
- }
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Time handler config
+ segmentProcessorConfigBuilder
+ .setTimeHandlerConfig(MergeTaskUtils.getTimeHandlerConfig(tableConfig, schema, configs));
+
+ // Partitioner config
+ segmentProcessorConfigBuilder
+ .setPartitionerConfigs(MergeTaskUtils.getPartitionerConfigs(tableConfig, schema, configs));
+
+ // Merge type
+ segmentProcessorConfigBuilder.setMergeType(MergeTaskUtils.getMergeType(configs));
+
+ // Aggregation types
+ segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs));
+
+ // Segment config
+ segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
@@ -120,8 +115,10 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
@Override
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
- return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
- .singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE + MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
- pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY).toUpperCase()));
+ Map<String, String> updateMap = new TreeMap<>();
+ updateMap.put(MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY,
+ pinotTaskConfig.getConfigs().get(MergeRollupTask.MERGE_LEVEL_KEY));
+ updateMap.put(MergeRollupTask.SEGMENT_ZK_METADATA_TIME_KEY, String.valueOf(System.currentTimeMillis()));
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap);
}
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
index 85b72c3..79733a2 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
@@ -18,100 +18,38 @@
*/
package org.apache.pinot.plugin.minion.tasks.merge_rollup;
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.minion.Granularity;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.segment.processing.framework.MergeType;
-import org.apache.pinot.pql.parsers.utils.Pair;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.spi.utils.TimeUtils;
+import java.util.TreeMap;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
public class MergeRollupTaskUtils {
//@formatter:off
- private static final String[] validMergeProperties = {
- MinionConstants.MergeRollupTask.MERGE_TYPE_KEY,
- MinionConstants.MergeRollupTask.BUFFER_TIME,
- MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT,
- MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_TASK
- };
-
- private static final String[] validMergeType = {
- MergeType.CONCAT.name(),
- MergeType.ROLLUP.name()
+ private static final String[] VALID_CONFIG_KEYS = {
+ MergeTask.BUCKET_TIME_PERIOD_KEY,
+ MergeTask.BUFFER_TIME_PERIOD_KEY,
+ MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY,
+ MergeTask.MERGE_TYPE_KEY,
+ MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+ MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY
};
//@formatter:on
- public static Map<String, AggregationFunctionType> getRollupAggregationTypes(Map<String, String> mergeRollupConfig) {
- Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
- for (Map.Entry<String, String> entry : mergeRollupConfig.entrySet()) {
- if (entry.getKey().startsWith(MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX)) {
- aggregationTypes.put(getAggregateColumn(entry.getKey()),
- AggregationFunctionType.getAggregationFunctionType(entry.getValue()));
- }
- }
- return aggregationTypes;
- }
-
- public static Map<Granularity, MergeProperties> getAllMergeProperties(Map<String, String> mergeRollupConfig) {
- Map<Granularity, Map<String, String>> mergePropertiesMap = new HashMap<>();
- for (Map.Entry<String, String> entry : mergeRollupConfig.entrySet()) {
- if (entry.getKey().startsWith(MinionConstants.MergeRollupTask.MERGE_KEY_PREFIX)) {
- Pair<Granularity, String> pair = getGranularityAndPropertyPair(entry.getKey(), entry.getValue());
- Granularity granularity = pair.getFirst();
- String mergeProperty = pair.getSecond();
- mergePropertiesMap.putIfAbsent(granularity, new HashMap<>());
- mergePropertiesMap.get(granularity).put(mergeProperty, entry.getValue());
- }
- }
-
- Map<Granularity, MergeProperties> allMergeProperties = new HashMap<>();
- for (Map.Entry<Granularity, Map<String, String>> entry : mergePropertiesMap.entrySet()) {
- Map<String, String> properties = entry.getValue();
- MergeProperties mergeProperties =
- new MergeProperties(properties.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY).toUpperCase(),
- TimeUtils.convertPeriodToMillis(properties.get(MinionConstants.MergeRollupTask.BUFFER_TIME)),
- Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT)),
- Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_TASK)));
- allMergeProperties.put(entry.getKey(), mergeProperties);
- }
- return allMergeProperties;
- }
-
- private static String getAggregateColumn(String rollupAggregateConfigKey) {
- return rollupAggregateConfigKey
- .split(MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX + MinionConstants.DOT_SEPARATOR)[1];
- }
-
- private static Pair<Granularity, String> getGranularityAndPropertyPair(String mergePropertyConfigKey,
- String mergePropertyConfigValue) {
- String[] components = StringUtils.split(mergePropertyConfigKey, MinionConstants.DOT_SEPARATOR);
- Preconditions.checkState(components.length == 3);
- Preconditions.checkState(isValidMergeProperties(components[2]));
- if (components[2].equals(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY)) {
- Preconditions.checkState(isValidMergeType(mergePropertyConfigValue));
- }
- return new Pair<>((Granularity.valueOf(components[1].toUpperCase())), components[2]);
- }
-
- private static boolean isValidMergeProperties(String property) {
- for (String validProperty : validMergeProperties) {
- if (property.equals(validProperty)) {
- return true;
- }
- }
- return false;
- }
-
- private static boolean isValidMergeType(String mergeType) {
- for (String validMergeType : validMergeType) {
- if (mergeType.toUpperCase().equals(validMergeType)) {
- return true;
+ /**
+ * Extracts a map from merge level to config from the task config.
+ * <p>The config for a specific level should have key of format "{level}.{configKey}" within the task config.
+ */
+ public static Map<String, Map<String, String>> getLevelToConfigMap(Map<String, String> taskConfig) {
+ Map<String, Map<String, String>> levelToConfigMap = new TreeMap<>();
+ for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
+ String key = entry.getKey();
+ for (String configKey : VALID_CONFIG_KEYS) {
+ if (key.endsWith(configKey)) {
+ String level = key.substring(0, key.length() - configKey.length() - 1);
+ levelToConfigMap.computeIfAbsent(level, k -> new TreeMap<>()).put(configKey, entry.getValue());
+ }
}
}
- return false;
+ return levelToConfigMap;
}
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
index 13802d0..2a24937 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -19,11 +19,9 @@
package org.apache.pinot.plugin.minion.tasks.realtime_to_offline_segments;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
@@ -34,23 +32,14 @@ import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.MergeType;
-import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
-import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
-import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
-import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
-import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +72,6 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
private int _expectedVersion = Integer.MIN_VALUE;
- private long _nextWatermark;
public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
_minionTaskZkMetadataManager = minionTaskZkMetadataManager;
@@ -130,63 +118,34 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
TableConfig tableConfig = getTableConfig(offlineTableName);
Schema schema = getSchema(offlineTableName);
- String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
- DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumn);
- Preconditions
- .checkState(dateTimeFieldSpec != null, "No valid spec found for time column: %s in schema for table: %s",
- timeColumn, offlineTableName);
+
SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
- long windowStartMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
- long windowEndMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
- _nextWatermark = windowEndMs;
-
// Time handler config
- TimeHandlerConfig.Builder timeHandlerConfigBuilder =
- new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(windowStartMs, windowEndMs);
- String roundBucketTimePeriod = configs.get(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY);
- if (roundBucketTimePeriod != null) {
- timeHandlerConfigBuilder.setRoundBucketMs(TimeUtils.convertPeriodToMillis(roundBucketTimePeriod));
- }
- segmentProcessorConfigBuilder.setTimeHandlerConfig(timeHandlerConfigBuilder.build());
+ segmentProcessorConfigBuilder
+ .setTimeHandlerConfig(MergeTaskUtils.getTimeHandlerConfig(tableConfig, schema, configs));
- // Partitioner config from tableConfig
- SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
- if (segmentPartitionConfig != null) {
- PartitionerConfig partitionerConfig = getPartitionerConfig(segmentPartitionConfig, offlineTableName, schema);
- segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
- }
+ // Partitioner config
+ segmentProcessorConfigBuilder
+ .setPartitionerConfigs(MergeTaskUtils.getPartitionerConfigs(tableConfig, schema, configs));
// Merge type
- String mergeType = configs.get(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY);
- // Handle deprecated key
+ MergeType mergeType = MergeTaskUtils.getMergeType(configs);
+ // Handle legacy key
if (mergeType == null) {
- mergeType = configs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
- }
- if (mergeType != null) {
- segmentProcessorConfigBuilder.setMergeType(MergeType.valueOf(mergeType.toUpperCase()));
+ String legacyMergeTypeStr = configs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
+ if (legacyMergeTypeStr != null) {
+ mergeType = MergeType.valueOf(legacyMergeTypeStr.toUpperCase());
+ }
}
+ segmentProcessorConfigBuilder.setMergeType(mergeType);
// Aggregation types
- Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
- for (Map.Entry<String, String> entry : configs.entrySet()) {
- String key = entry.getKey();
- if (key.endsWith(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
- String column = key.split(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)[0];
- aggregationTypes.put(column, AggregationFunctionType.getAggregationFunctionType(entry.getValue()));
- }
- }
- if (!aggregationTypes.isEmpty()) {
- segmentProcessorConfigBuilder.setAggregationTypes(aggregationTypes);
- }
+ segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs));
// Segment config
- String maxNumRecordsPerSegment = configs.get(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
- if (maxNumRecordsPerSegment != null) {
- segmentProcessorConfigBuilder.setSegmentConfig(
- new SegmentConfig.Builder().setMaxNumRecordsPerSegment(Integer.parseInt(maxNumRecordsPerSegment)).build());
- }
+ segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
@@ -226,9 +185,11 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
*/
@Override
public void postProcess(PinotTaskConfig pinotTaskConfig) {
- String realtimeTableName = pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY);
+ Map<String, String> configs = pinotTaskConfig.getConfigs();
+ String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
+ long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
- new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, _nextWatermark);
+ new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs);
_minionTaskZkMetadataManager.setRealtimeToOfflineSegmentsTaskMetadata(newMinionMetadata, _expectedVersion);
}
@@ -238,20 +199,4 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
Collections.emptyMap());
}
-
- /**
- * Construct a {@link PartitionerConfig} using {@link SegmentPartitionConfig} from the table config
- */
- private PartitionerConfig getPartitionerConfig(SegmentPartitionConfig partitionConfig, String tableNameWithType,
- Schema schema) {
- Map<String, ColumnPartitionConfig> columnPartitionMap = partitionConfig.getColumnPartitionMap();
- Preconditions.checkState(columnPartitionMap.size() == 1,
- "Cannot partition using more than 1 ColumnPartitionConfig for table: %s", tableNameWithType);
- Map.Entry<String, ColumnPartitionConfig> entry = columnPartitionMap.entrySet().iterator().next();
- String partitionColumn = entry.getKey();
- Preconditions.checkState(schema.hasColumn(partitionColumn),
- "Partition column: %s does not exist in the schema for table: %s", partitionColumn, tableNameWithType);
- return new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
- .setColumnName(partitionColumn).setColumnPartitionConfig(entry.getValue()).build();
- }
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
new file mode 100644
index 0000000..0139392
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
+import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
+import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class MergeTaskUtilsTest {
+
+ @Test
+ public void testGetTimeHandlerConfig() {
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("dateTime").build();
+ Schema schema = new Schema.SchemaBuilder()
+ .addDateTime("dateTime", DataType.LONG, "1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss", "1:SECONDS").build();
+ Map<String, String> taskConfig = new HashMap<>();
+ long expectedWindowStartMs = 1625097600000L;
+ long expectedWindowEndMs = 1625184000000L;
+ taskConfig.put(MergeTask.WINDOW_START_MS_KEY, Long.toString(expectedWindowStartMs));
+ taskConfig.put(MergeTask.WINDOW_END_MS_KEY, Long.toString(expectedWindowEndMs));
+ long expectedRoundBucketMs = 6 * 3600 * 1000;
+ taskConfig.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, "6h");
+ long expectedPartitionBucketMs = 24 * 3600 * 1000;
+ taskConfig.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, "1d");
+
+ TimeHandlerConfig timeHandlerConfig = MergeTaskUtils.getTimeHandlerConfig(tableConfig, schema, taskConfig);
+ assertNotNull(timeHandlerConfig);
+ assertEquals(timeHandlerConfig.getType(), TimeHandler.Type.EPOCH);
+ assertEquals(timeHandlerConfig.getStartTimeMs(), expectedWindowStartMs);
+ assertEquals(timeHandlerConfig.getEndTimeMs(), expectedWindowEndMs);
+ assertEquals(timeHandlerConfig.getRoundBucketMs(), expectedRoundBucketMs);
+ assertEquals(timeHandlerConfig.getPartitionBucketMs(), expectedPartitionBucketMs);
+
+ // No time column in table config
+ TableConfig tableConfigWithoutTimeColumn =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
+ assertNull(MergeTaskUtils.getTimeHandlerConfig(tableConfigWithoutTimeColumn, schema, taskConfig));
+
+ // Time column does not exist in schema
+ Schema schemaWithoutTimeColumn = new Schema.SchemaBuilder().build();
+ try {
+ MergeTaskUtils.getTimeHandlerConfig(tableConfig, schemaWithoutTimeColumn, taskConfig);
+ fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testGetPartitionerConfigs() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable")
+ .setSegmentPartitionConfig(
+ new SegmentPartitionConfig(Collections.singletonMap("memberId", new ColumnPartitionConfig("murmur", 10))))
+ .build();
+ Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("memberId", DataType.LONG).build();
+ Map<String, String> taskConfig = Collections.emptyMap();
+
+ List<PartitionerConfig> partitionerConfigs = MergeTaskUtils.getPartitionerConfigs(tableConfig, schema, taskConfig);
+ assertEquals(partitionerConfigs.size(), 1);
+ PartitionerConfig partitionerConfig = partitionerConfigs.get(0);
+ assertEquals(partitionerConfig.getPartitionerType(), PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG);
+ assertEquals(partitionerConfig.getColumnName(), "memberId");
+ ColumnPartitionConfig columnPartitionConfig = partitionerConfig.getColumnPartitionConfig();
+ assertEquals(columnPartitionConfig.getFunctionName(), "murmur");
+ assertEquals(columnPartitionConfig.getNumPartitions(), 10);
+
+ // No partition column in table config
+ TableConfig tableConfigWithoutPartitionColumn =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
+ assertTrue(MergeTaskUtils.getPartitionerConfigs(tableConfigWithoutPartitionColumn, schema, taskConfig).isEmpty());
+
+ // Partition column does not exist in schema
+ Schema schemaWithoutPartitionColumn = new Schema.SchemaBuilder().build();
+ try {
+ MergeTaskUtils.getPartitionerConfigs(tableConfig, schemaWithoutPartitionColumn, taskConfig);
+ fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testGetMergeType() {
+ assertEquals(MergeTaskUtils.getMergeType(Collections.singletonMap(MergeTask.MERGE_TYPE_KEY, "concat")),
+ MergeType.CONCAT);
+ assertEquals(MergeTaskUtils.getMergeType(Collections.singletonMap(MergeTask.MERGE_TYPE_KEY, "Rollup")),
+ MergeType.ROLLUP);
+ assertEquals(MergeTaskUtils.getMergeType(Collections.singletonMap(MergeTask.MERGE_TYPE_KEY, "DeDuP")),
+ MergeType.DEDUP);
+ assertNull(MergeTaskUtils.getMergeType(Collections.emptyMap()));
+
+ try {
+ MergeTaskUtils.getMergeType(Collections.singletonMap(MergeTask.MERGE_TYPE_KEY, "unsupported"));
+ fail();
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testGetAggregationTypes() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put("colA.aggregationType", "sum");
+ taskConfig.put("colB.aggregationType", "Min");
+ taskConfig.put("colC.aggregationType", "MaX");
+
+ Map<String, AggregationFunctionType> aggregationTypes = MergeTaskUtils.getAggregationTypes(taskConfig);
+ assertEquals(aggregationTypes.size(), 3);
+ assertEquals(aggregationTypes.get("colA"), AggregationFunctionType.SUM);
+ assertEquals(aggregationTypes.get("colB"), AggregationFunctionType.MIN);
+ assertEquals(aggregationTypes.get("colC"), AggregationFunctionType.MAX);
+
+ taskConfig.put("colD.aggregationType", "unsupported");
+ try {
+ MergeTaskUtils.getAggregationTypes(taskConfig);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testGetSegmentConfig() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, "10000");
+ taskConfig.put(MergeTask.SEGMENT_NAME_PREFIX_KEY, "myPrefix");
+ SegmentConfig segmentConfig = MergeTaskUtils.getSegmentConfig(taskConfig);
+ assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), 10000);
+ assertEquals(segmentConfig.getSegmentNamePrefix(), "myPrefix");
+
+ segmentConfig = MergeTaskUtils.getSegmentConfig(Collections.emptyMap());
+ assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), SegmentConfig.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT);
+ assertNull(segmentConfig.getSegmentNamePrefix());
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutorTest.java
index 27a89e3..5cf3afc 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutorTest.java
@@ -106,9 +106,8 @@ public class MergeRollupTaskExecutorTest {
throws Exception {
MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor();
Map<String, String> configs = new HashMap<>();
- configs.put(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY, "CONCAT");
- configs.put(MinionConstants.MergeRollupTask.GRANULARITY_KEY, "Daily");
configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
+ configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
List<SegmentConversionResult> conversionResults =
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
index 6058bc0..864e57d 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
@@ -20,64 +20,44 @@ package org.apache.pinot.plugin.minion.tasks.merge_rollup;
import java.util.HashMap;
import java.util.Map;
-import org.apache.pinot.common.minion.Granularity;
-import org.apache.pinot.core.segment.processing.framework.MergeType;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
+import org.apache.pinot.core.common.MinionConstants.MergeTask;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
-public class MergeRollupTaskUtilsTest {
- private final String METRIC_COLUMN_A = "metricColA";
- private final String METRIC_COLUMN_B = "metricColB";
- private Map<String, String> _mergeRollupTaskConfig;
-
- @BeforeClass
- public void setUp() {
- Map<String, String> mergeRollupTaskConfig = new HashMap<>();
- mergeRollupTaskConfig.put("aggregate.metricColA", "sum");
- mergeRollupTaskConfig.put("aggregate.metricColB", "max");
- mergeRollupTaskConfig.put("merge.daily.mergeType", "concat");
- mergeRollupTaskConfig.put("merge.daily.bufferTime", "2d");
- mergeRollupTaskConfig.put("merge.daily.maxNumRecordsPerSegment", "1000000");
- mergeRollupTaskConfig.put("merge.daily.maxNumRecordsPerTask", "5000000");
- mergeRollupTaskConfig.put("merge.monthly.mergeType", "rollup");
- mergeRollupTaskConfig.put("merge.monthly.bufferTime", "30d");
- mergeRollupTaskConfig.put("merge.monthly.maxNumRecordsPerSegment", "2000000");
- mergeRollupTaskConfig.put("merge.monthly.maxNumRecordsPerTask", "5000000");
- _mergeRollupTaskConfig = mergeRollupTaskConfig;
- }
- @Test
- public void testGetRollupAggregationTypeMap() {
- Map<String, AggregationFunctionType> rollupAggregationTypeMap =
- MergeRollupTaskUtils.getRollupAggregationTypes(_mergeRollupTaskConfig);
- Assert.assertEquals(rollupAggregationTypeMap.size(), 2);
- Assert.assertTrue(rollupAggregationTypeMap.containsKey(METRIC_COLUMN_A));
- Assert.assertTrue(rollupAggregationTypeMap.containsKey(METRIC_COLUMN_B));
- Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_A), AggregationFunctionType.SUM);
- Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_B), AggregationFunctionType.MAX);
- }
+public class MergeRollupTaskUtilsTest {
@Test
- public void testGetAllMergeProperties() {
- Map<Granularity, MergeProperties> allMergeProperties =
- MergeRollupTaskUtils.getAllMergeProperties(_mergeRollupTaskConfig);
- Assert.assertEquals(allMergeProperties.size(), 2);
- Assert.assertTrue(allMergeProperties.containsKey(Granularity.DAILY));
- Assert.assertTrue(allMergeProperties.containsKey(Granularity.MONTHLY));
-
- MergeProperties dailyProperty = allMergeProperties.get(Granularity.DAILY);
- Assert.assertEquals(dailyProperty.getMergeType(), MergeType.CONCAT.name());
- Assert.assertEquals(dailyProperty.getBufferTimeMs(), 172800000L);
- Assert.assertEquals(dailyProperty.getMaxNumRecordsPerSegment(), 1000000L);
- Assert.assertEquals(dailyProperty.getMaxNumRecordsPerTask(), 5000000L);
-
- MergeProperties monthlyProperty = allMergeProperties.get(Granularity.MONTHLY);
- Assert.assertEquals(monthlyProperty.getMergeType(), MergeType.ROLLUP.name());
- Assert.assertEquals(monthlyProperty.getBufferTimeMs(), 2592000000L);
- Assert.assertEquals(monthlyProperty.getMaxNumRecordsPerSegment(), 2000000L);
- Assert.assertEquals(monthlyProperty.getMaxNumRecordsPerTask(), 5000000L);
+ public void testGetLevelToConfigMap() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put("daily.bucketTimePeriod", "1d");
+ taskConfig.put("daily.bufferTimePeriod", "3d");
+ taskConfig.put("daily.maxNumRecordsPerSegment", "1000000");
+ taskConfig.put("monthly.bucketTimePeriod", "30d");
+ taskConfig.put("monthly.bufferTimePeriod", "10d");
+ taskConfig.put("monthly.roundBucketTimePeriod", "7d");
+ taskConfig.put("monthly.mergeType", "rollup");
+ taskConfig.put("monthly.maxNumRecordsPerTask", "5000000");
+
+ Map<String, Map<String, String>> levelToConfigMap = MergeRollupTaskUtils.getLevelToConfigMap(taskConfig);
+ assertEquals(levelToConfigMap.size(), 2);
+
+ Map<String, String> dailyConfig = levelToConfigMap.get("daily");
+ assertNotNull(dailyConfig);
+ assertEquals(dailyConfig.size(), 3);
+ assertEquals(dailyConfig.get(MergeTask.BUCKET_TIME_PERIOD_KEY), "1d");
+ assertEquals(dailyConfig.get(MergeTask.BUFFER_TIME_PERIOD_KEY), "3d");
+ assertEquals(dailyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY), "1000000");
+
+ Map<String, String> monthlyConfig = levelToConfigMap.get("monthly");
+ assertNotNull(monthlyConfig);
+ assertEquals(monthlyConfig.size(), 5);
+ assertEquals(monthlyConfig.get(MergeTask.BUCKET_TIME_PERIOD_KEY), "30d");
+ assertEquals(monthlyConfig.get(MergeTask.BUFFER_TIME_PERIOD_KEY), "10d");
+ assertEquals(monthlyConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY), "7d");
+ assertEquals(monthlyConfig.get(MergeTask.MERGE_TYPE_KEY), "rollup");
+ assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY), "5000000");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org