You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/07/03 23:04:05 UTC
[pinot] branch master updated: Allow custom segment grouping in MergeRollupTask based on lineage metadata (#10964)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eefa8fbfe8 Allow custom segment grouping in MergeRollupTask based on lineage metadata (#10964)
eefa8fbfe8 is described below
commit eefa8fbfe86bc5b1412f9b9085364fb8a773476f
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Mon Jul 3 16:03:58 2023 -0700
Allow custom segment grouping in MergeRollupTask based on lineage metadata (#10964)
---
.../apache/pinot/core/common/MinionConstants.java | 3 +
.../mergerollup/MergeRollupTaskGenerator.java | 130 +++++++++++----------
.../DefaultMergeRollupTaskSegmentGroupManager.java | 35 ++++++
.../MergeRollupTaskSegmentGroupManager.java | 36 ++++++
...MergeRollupTaskSegmentGroupManagerProvider.java | 46 ++++++++
5 files changed, 188 insertions(+), 62 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 530f757048..8e4c5cfc1c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -118,6 +118,9 @@ public class MinionConstants {
public static final String SEGMENT_ZK_METADATA_TIME_KEY = TASK_TYPE + TASK_TIME_SUFFIX;
public static final String MERGED_SEGMENT_NAME_PREFIX = "merged_";
+
+ // Custom segment group manager class name
+ public static final String SEGMENT_GROUP_MANAGER_CLASS_NAME_KEY = "segment.group.manager.class.name";
}
/**
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index a19634d525..7d87abc96d 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -49,6 +49,7 @@ import org.apache.pinot.core.common.MinionConstants.MergeTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger.MergeRollupTaskSegmentGroupManagerProvider;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -409,7 +410,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
if (segmentPartitionConfig == null) {
for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(selectedSegmentsPerBucket, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
+ createPinotTaskConfigs(selectedSegmentsPerBucket, tableConfig, maxNumRecordsPerTask, mergeLevel,
null, mergeConfigs, taskConfigs));
}
} else {
@@ -448,13 +449,13 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
List<Integer> partition = entry.getKey();
List<SegmentZKMetadata> partitionedSegments = entry.getValue();
pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(partitionedSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
+ createPinotTaskConfigs(partitionedSegments, tableConfig, maxNumRecordsPerTask, mergeLevel,
partition, mergeConfigs, taskConfigs));
}
if (!outlierSegments.isEmpty()) {
pinotTaskConfigsForTable.addAll(
- createPinotTaskConfigs(outlierSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
+ createPinotTaskConfigs(outlierSegments, tableConfig, maxNumRecordsPerTask, mergeLevel,
null, mergeConfigs, taskConfigs));
}
}
@@ -651,72 +652,77 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
* Create pinot task configs with selected segments and configs
*/
private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> selectedSegments,
- String tableNameWithType, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition,
+ TableConfig tableConfig, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition,
Map<String, String> mergeConfigs, Map<String, String> taskConfigs) {
- int numRecordsPerTask = 0;
- List<List<String>> segmentNamesList = new ArrayList<>();
- List<List<String>> downloadURLsList = new ArrayList<>();
- List<String> segmentNames = new ArrayList<>();
- List<String> downloadURLs = new ArrayList<>();
-
- for (int i = 0; i < selectedSegments.size(); i++) {
- SegmentZKMetadata targetSegment = selectedSegments.get(i);
- segmentNames.add(targetSegment.getSegmentName());
- downloadURLs.add(targetSegment.getDownloadUrl());
- numRecordsPerTask += targetSegment.getTotalDocs();
- if (numRecordsPerTask >= maxNumRecordsPerTask || i == selectedSegments.size() - 1) {
- segmentNamesList.add(segmentNames);
- downloadURLsList.add(downloadURLs);
- numRecordsPerTask = 0;
- segmentNames = new ArrayList<>();
- downloadURLs = new ArrayList<>();
- }
- }
-
+ String tableNameWithType = tableConfig.getTableName();
+ List<List<SegmentZKMetadata>> segmentGroups = MergeRollupTaskSegmentGroupManagerProvider.create(taskConfigs)
+ .getSegmentGroups(tableConfig, _clusterInfoAccessor, selectedSegments);
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
- StringBuilder partitionSuffixBuilder = new StringBuilder();
- if (partition != null && !partition.isEmpty()) {
- for (int columnPartition : partition) {
- partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition);
+ for (List<SegmentZKMetadata> segments : segmentGroups) {
+ int numRecordsPerTask = 0;
+ List<List<String>> segmentNamesList = new ArrayList<>();
+ List<List<String>> downloadURLsList = new ArrayList<>();
+ List<String> segmentNames = new ArrayList<>();
+ List<String> downloadURLs = new ArrayList<>();
+
+ for (int i = 0; i < segments.size(); i++) {
+ SegmentZKMetadata targetSegment = segments.get(i);
+ segmentNames.add(targetSegment.getSegmentName());
+ downloadURLs.add(targetSegment.getDownloadUrl());
+ numRecordsPerTask += targetSegment.getTotalDocs();
+ if (numRecordsPerTask >= maxNumRecordsPerTask || i == segments.size() - 1) {
+ segmentNamesList.add(segmentNames);
+ downloadURLsList.add(downloadURLs);
+ numRecordsPerTask = 0;
+ segmentNames = new ArrayList<>();
+ downloadURLs = new ArrayList<>();
+ }
}
- }
- String partitionSuffix = partitionSuffixBuilder.toString();
-
- for (int i = 0; i < segmentNamesList.size(); i++) {
- String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR);
- Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(tableNameWithType, taskConfigs,
- _clusterInfoAccessor);
- configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
- configs.put(MinionConstants.SEGMENT_NAME_KEY,
- StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR));
- configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL);
- configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
- configs.put(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY, "true");
-
- for (Map.Entry<String, String> taskConfig : taskConfigs.entrySet()) {
- if (taskConfig.getKey().endsWith(MinionConstants.MergeRollupTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
- configs.put(taskConfig.getKey(), taskConfig.getValue());
+
+ StringBuilder partitionSuffixBuilder = new StringBuilder();
+ if (partition != null && !partition.isEmpty()) {
+ for (int columnPartition : partition) {
+ partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition);
}
}
+ String partitionSuffix = partitionSuffixBuilder.toString();
+
+ for (int i = 0; i < segmentNamesList.size(); i++) {
+ String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR);
+ Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(tableNameWithType, taskConfigs,
+ _clusterInfoAccessor);
+ configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
+ configs.put(MinionConstants.SEGMENT_NAME_KEY,
+ StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR));
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL);
+ configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
+ configs.put(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY, "true");
+
+ for (Map.Entry<String, String> taskConfig : taskConfigs.entrySet()) {
+ if (taskConfig.getKey().endsWith(MinionConstants.MergeRollupTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
+ configs.put(taskConfig.getKey(), taskConfig.getValue());
+ }
+ }
- configs.put(BatchConfigProperties.OVERWRITE_OUTPUT,
- taskConfigs.getOrDefault(BatchConfigProperties.OVERWRITE_OUTPUT, "false"));
- configs.put(MergeRollupTask.MERGE_TYPE_KEY, mergeConfigs.get(MergeTask.MERGE_TYPE_KEY));
- configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
- configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
- configs.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY));
- configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
- mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
-
- // Segment name conflict happens when the current method "createPinotTaskConfigs" is invoked more than once within
- // the same epoch millisecond, which may happen when there are multiple partitions.
- // To prevent such name conflict, we include a partitionSeqSuffix to the segment name.
- configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
- MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME
- + System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i
- + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(tableNameWithType));
- pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
+ configs.put(BatchConfigProperties.OVERWRITE_OUTPUT,
+ taskConfigs.getOrDefault(BatchConfigProperties.OVERWRITE_OUTPUT, "false"));
+ configs.put(MergeRollupTask.MERGE_TYPE_KEY, mergeConfigs.get(MergeTask.MERGE_TYPE_KEY));
+ configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
+ configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
+ configs.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY));
+ configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+ mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
+
+ // Segment name conflict happens when the current method "createPinotTaskConfigs" is invoked more than once
+ // within the same epoch millisecond, which may happen when there are multiple partitions.
+ // To prevent such name conflict, we include a partitionSeqSuffix to the segment name.
+ configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
+ MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME
+ + System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i
+ + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(tableNameWithType));
+ pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
+ }
}
return pinotTaskConfigs;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java
new file mode 100644
index 0000000000..9aca72ac35
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+public class DefaultMergeRollupTaskSegmentGroupManager implements MergeRollupTaskSegmentGroupManager {
+
+ @Override
+ public List<List<SegmentZKMetadata>> getSegmentGroups(TableConfig tableConfig,
+ ClusterInfoAccessor clusterInfoAccessor, List<SegmentZKMetadata> segments) {
+ return Collections.singletonList(segments);
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java
new file mode 100644
index 0000000000..1bf74140ce
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger;
+
+import java.util.List;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+/**
+ * The interface <code>PinotTaskSegmentGroupManager</code> defines the APIs to group segments for task generation.
+ */
+public interface MergeRollupTaskSegmentGroupManager {
+ /**
+ * Returns a list of segment groups which are scheduled in separate tasks
+ */
+ List<List<SegmentZKMetadata>> getSegmentGroups(TableConfig tableConfig, ClusterInfoAccessor clusterInfoAccessor,
+ List<SegmentZKMetadata> segments);
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java
new file mode 100644
index 0000000000..ca163f20f2
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger;
+
+import java.util.Map;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.plugin.PluginManager;
+
+
+/**
+ * Provider class for {@link MergeRollupTaskSegmentGroupManager}
+ */
+public abstract class MergeRollupTaskSegmentGroupManagerProvider {
+ /**
+ * Constructs the {@link MergeRollupTaskSegmentGroupManager} using MergeRollup task configs
+ */
+ public static MergeRollupTaskSegmentGroupManager create(Map<String, String> taskConfigs) {
+ String segmentGroupManagerClassName =
+ taskConfigs.get(MinionConstants.MergeRollupTask.SEGMENT_GROUP_MANAGER_CLASS_NAME_KEY);
+ if (segmentGroupManagerClassName != null) {
+ try {
+ return PluginManager.get().createInstance(segmentGroupManagerClassName);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to create segment group manager", e);
+ }
+ } else {
+ return new DefaultMergeRollupTaskSegmentGroupManager();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org