You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/07/03 23:04:05 UTC

[pinot] branch master updated: Allow custom segment grouping in MergeRollupTask based on lineage metadata (#10964)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eefa8fbfe8 Allow custom segment grouping in MergeRollupTask based on lineage metadata (#10964)
eefa8fbfe8 is described below

commit eefa8fbfe86bc5b1412f9b9085364fb8a773476f
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Mon Jul 3 16:03:58 2023 -0700

    Allow custom segment grouping in MergeRollupTask based on lineage metadata (#10964)
---
 .../apache/pinot/core/common/MinionConstants.java  |   3 +
 .../mergerollup/MergeRollupTaskGenerator.java      | 130 +++++++++++----------
 .../DefaultMergeRollupTaskSegmentGroupManager.java |  35 ++++++
 .../MergeRollupTaskSegmentGroupManager.java        |  36 ++++++
 ...MergeRollupTaskSegmentGroupManagerProvider.java |  46 ++++++++
 5 files changed, 188 insertions(+), 62 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 530f757048..8e4c5cfc1c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -118,6 +118,9 @@ public class MinionConstants {
     public static final String SEGMENT_ZK_METADATA_TIME_KEY = TASK_TYPE + TASK_TIME_SUFFIX;
 
     public static final String MERGED_SEGMENT_NAME_PREFIX = "merged_";
+
+    // Custom segment group manager class name
+    public static final String SEGMENT_GROUP_MANAGER_CLASS_NAME_KEY = "segment.group.manager.class.name";
   }
 
   /**
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index a19634d525..7d87abc96d 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -49,6 +49,7 @@ import org.apache.pinot.core.common.MinionConstants.MergeTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
 import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger.MergeRollupTaskSegmentGroupManagerProvider;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -409,7 +410,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
         if (segmentPartitionConfig == null) {
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
             pinotTaskConfigsForTable.addAll(
-                createPinotTaskConfigs(selectedSegmentsPerBucket, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
+                createPinotTaskConfigs(selectedSegmentsPerBucket, tableConfig, maxNumRecordsPerTask, mergeLevel,
                     null, mergeConfigs, taskConfigs));
           }
         } else {
@@ -448,13 +449,13 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
               List<Integer> partition = entry.getKey();
               List<SegmentZKMetadata> partitionedSegments = entry.getValue();
               pinotTaskConfigsForTable.addAll(
-                  createPinotTaskConfigs(partitionedSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
+                  createPinotTaskConfigs(partitionedSegments, tableConfig, maxNumRecordsPerTask, mergeLevel,
                       partition, mergeConfigs, taskConfigs));
             }
 
             if (!outlierSegments.isEmpty()) {
               pinotTaskConfigsForTable.addAll(
-                  createPinotTaskConfigs(outlierSegments, tableNameWithType, maxNumRecordsPerTask, mergeLevel,
+                  createPinotTaskConfigs(outlierSegments, tableConfig, maxNumRecordsPerTask, mergeLevel,
                       null, mergeConfigs, taskConfigs));
             }
           }
@@ -651,72 +652,77 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator {
    * Create pinot task configs with selected segments and configs
    */
   private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> selectedSegments,
-      String tableNameWithType, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition,
+      TableConfig tableConfig, int maxNumRecordsPerTask, String mergeLevel, List<Integer> partition,
       Map<String, String> mergeConfigs, Map<String, String> taskConfigs) {
-    int numRecordsPerTask = 0;
-    List<List<String>> segmentNamesList = new ArrayList<>();
-    List<List<String>> downloadURLsList = new ArrayList<>();
-    List<String> segmentNames = new ArrayList<>();
-    List<String> downloadURLs = new ArrayList<>();
-
-    for (int i = 0; i < selectedSegments.size(); i++) {
-      SegmentZKMetadata targetSegment = selectedSegments.get(i);
-      segmentNames.add(targetSegment.getSegmentName());
-      downloadURLs.add(targetSegment.getDownloadUrl());
-      numRecordsPerTask += targetSegment.getTotalDocs();
-      if (numRecordsPerTask >= maxNumRecordsPerTask || i == selectedSegments.size() - 1) {
-        segmentNamesList.add(segmentNames);
-        downloadURLsList.add(downloadURLs);
-        numRecordsPerTask = 0;
-        segmentNames = new ArrayList<>();
-        downloadURLs = new ArrayList<>();
-      }
-    }
-
+    String tableNameWithType = tableConfig.getTableName();
+    List<List<SegmentZKMetadata>> segmentGroups = MergeRollupTaskSegmentGroupManagerProvider.create(taskConfigs)
+        .getSegmentGroups(tableConfig, _clusterInfoAccessor, selectedSegments);
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
 
-    StringBuilder partitionSuffixBuilder = new StringBuilder();
-    if (partition != null && !partition.isEmpty()) {
-      for (int columnPartition : partition) {
-        partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition);
+    for (List<SegmentZKMetadata> segments : segmentGroups) {
+      int numRecordsPerTask = 0;
+      List<List<String>> segmentNamesList = new ArrayList<>();
+      List<List<String>> downloadURLsList = new ArrayList<>();
+      List<String> segmentNames = new ArrayList<>();
+      List<String> downloadURLs = new ArrayList<>();
+
+      for (int i = 0; i < segments.size(); i++) {
+        SegmentZKMetadata targetSegment = segments.get(i);
+        segmentNames.add(targetSegment.getSegmentName());
+        downloadURLs.add(targetSegment.getDownloadUrl());
+        numRecordsPerTask += targetSegment.getTotalDocs();
+        if (numRecordsPerTask >= maxNumRecordsPerTask || i == segments.size() - 1) {
+          segmentNamesList.add(segmentNames);
+          downloadURLsList.add(downloadURLs);
+          numRecordsPerTask = 0;
+          segmentNames = new ArrayList<>();
+          downloadURLs = new ArrayList<>();
+        }
       }
-    }
-    String partitionSuffix = partitionSuffixBuilder.toString();
-
-    for (int i = 0; i < segmentNamesList.size(); i++) {
-      String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR);
-      Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(tableNameWithType, taskConfigs,
-          _clusterInfoAccessor);
-      configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
-      configs.put(MinionConstants.SEGMENT_NAME_KEY,
-          StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR));
-      configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL);
-      configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
-      configs.put(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY, "true");
-
-      for (Map.Entry<String, String> taskConfig : taskConfigs.entrySet()) {
-        if (taskConfig.getKey().endsWith(MinionConstants.MergeRollupTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
-          configs.put(taskConfig.getKey(), taskConfig.getValue());
+
+      StringBuilder partitionSuffixBuilder = new StringBuilder();
+      if (partition != null && !partition.isEmpty()) {
+        for (int columnPartition : partition) {
+          partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition);
         }
       }
+      String partitionSuffix = partitionSuffixBuilder.toString();
+
+      for (int i = 0; i < segmentNamesList.size(); i++) {
+        String downloadURL = StringUtils.join(downloadURLsList.get(i), MinionConstants.URL_SEPARATOR);
+        Map<String, String> configs = MinionTaskUtils.getPushTaskConfig(tableNameWithType, taskConfigs,
+            _clusterInfoAccessor);
+        configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType);
+        configs.put(MinionConstants.SEGMENT_NAME_KEY,
+            StringUtils.join(segmentNamesList.get(i), MinionConstants.SEGMENT_NAME_SEPARATOR));
+        configs.put(MinionConstants.DOWNLOAD_URL_KEY, downloadURL);
+        configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
+        configs.put(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY, "true");
+
+        for (Map.Entry<String, String> taskConfig : taskConfigs.entrySet()) {
+          if (taskConfig.getKey().endsWith(MinionConstants.MergeRollupTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
+            configs.put(taskConfig.getKey(), taskConfig.getValue());
+          }
+        }
 
-      configs.put(BatchConfigProperties.OVERWRITE_OUTPUT,
-          taskConfigs.getOrDefault(BatchConfigProperties.OVERWRITE_OUTPUT, "false"));
-      configs.put(MergeRollupTask.MERGE_TYPE_KEY, mergeConfigs.get(MergeTask.MERGE_TYPE_KEY));
-      configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
-      configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
-      configs.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY));
-      configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
-          mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
-
-      // Segment name conflict happens when the current method "createPinotTaskConfigs" is invoked more than once within
-      // the same epoch millisecond, which may happen when there are multiple partitions.
-      // To prevent such name conflict, we include a partitionSeqSuffix to the segment name.
-      configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
-          MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME
-              + System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i
-              + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(tableNameWithType));
-      pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
+        configs.put(BatchConfigProperties.OVERWRITE_OUTPUT,
+            taskConfigs.getOrDefault(BatchConfigProperties.OVERWRITE_OUTPUT, "false"));
+        configs.put(MergeRollupTask.MERGE_TYPE_KEY, mergeConfigs.get(MergeTask.MERGE_TYPE_KEY));
+        configs.put(MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel);
+        configs.put(MergeTask.PARTITION_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
+        configs.put(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY, mergeConfigs.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY));
+        configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
+            mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
+
+        // Segment name conflict happens when the current method "createPinotTaskConfigs" is invoked more than once
+        // within the same epoch millisecond, which may happen when there are multiple partitions.
+        // To prevent such name conflict, we include a partitionSeqSuffix to the segment name.
+        configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
+            MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + DELIMITER_IN_SEGMENT_NAME
+                + System.currentTimeMillis() + partitionSuffix + DELIMITER_IN_SEGMENT_NAME + i
+                + DELIMITER_IN_SEGMENT_NAME + TableNameBuilder.extractRawTableName(tableNameWithType));
+        pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
+      }
     }
 
     return pinotTaskConfigs;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java
new file mode 100644
index 0000000000..9aca72ac35
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/DefaultMergeRollupTaskSegmentGroupManager.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+public class DefaultMergeRollupTaskSegmentGroupManager implements MergeRollupTaskSegmentGroupManager {
+
+  @Override
+  public List<List<SegmentZKMetadata>> getSegmentGroups(TableConfig tableConfig,
+      ClusterInfoAccessor clusterInfoAccessor, List<SegmentZKMetadata> segments) {
+    return Collections.singletonList(segments);
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java
new file mode 100644
index 0000000000..1bf74140ce
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManager.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger;
+
+import java.util.List;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+/**
+ * The interface <code>PinotTaskSegmentGroupManager</code> defines the APIs to group segments for task generation.
+ */
+public interface MergeRollupTaskSegmentGroupManager {
+  /**
+   * Returns a list of segment groups which are scheduled in separate tasks
+   */
+  List<List<SegmentZKMetadata>> getSegmentGroups(TableConfig tableConfig, ClusterInfoAccessor clusterInfoAccessor,
+      List<SegmentZKMetadata> segments);
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java
new file mode 100644
index 0000000000..ca163f20f2
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/segmentgroupmananger/MergeRollupTaskSegmentGroupManagerProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger;
+
+import java.util.Map;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.plugin.PluginManager;
+
+
+/**
+ * Provider class for {@link MergeRollupTaskSegmentGroupManager}
+ */
+public abstract class MergeRollupTaskSegmentGroupManagerProvider {
+  /**
+   * Constructs the {@link MergeRollupTaskSegmentGroupManager} using MergeRollup task configs
+   */
+  public static MergeRollupTaskSegmentGroupManager create(Map<String, String> taskConfigs) {
+    String segmentGroupManagerClassName =
+        taskConfigs.get(MinionConstants.MergeRollupTask.SEGMENT_GROUP_MANAGER_CLASS_NAME_KEY);
+    if (segmentGroupManagerClassName != null) {
+      try {
+        return PluginManager.get().createInstance(segmentGroupManagerClassName);
+      } catch (Exception e) {
+        throw new RuntimeException("Fail to create segment group manager", e);
+      }
+    } else {
+      return new DefaultMergeRollupTaskSegmentGroupManager();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org