You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/14 00:57:44 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8255: Added multi column partitioning for offline table

Jackie-Jiang commented on code in PR #8255:
URL: https://github.com/apache/pinot/pull/8255#discussion_r850001269


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -351,30 +350,39 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
                     mergeConfigs, taskConfigs));
           }
         } else {
-          // For partitioned table, schedule separate tasks for each partition
+          // For partitioned table, schedule separate tasks for each partitionId (partitionId is constructed from
+          // partitions of all partition columns. There should be exact match between partition columns of segment and
+          // partition columns of table configuration, and there is only partition per column in segment metadata).
+          // Other segments which do not meet these conditions are considered as outlier segments, and additional tasks
+          // are generated for them.
           Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
-          Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot partition on multiple columns for table: %s",
-              tableConfig.getTableName());
-          Map.Entry<String, ColumnPartitionConfig> partitionEntry = columnPartitionMap.entrySet().iterator().next();
-          String partitionColumn = partitionEntry.getKey();
-
+          Set<String> partitionColumns = columnPartitionMap.keySet();
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
-            Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>();
-            // Handle segments that have multiple partitions or no partition info
+            Map<String, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>();
             List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
             for (SegmentZKMetadata selectedSegment : selectedSegmentsPerBucket) {
               SegmentPartitionMetadata segmentPartitionMetadata = selectedSegment.getPartitionMetadata();
-              if (segmentPartitionMetadata == null
-                  || segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
+              List<Integer> partitionsBuffer = new ArrayList<>();
+              if (segmentPartitionMetadata != null && partitionColumns.equals(
+                  segmentPartitionMetadata.getColumnPartitionMap().keySet())) {
+                for (String partitionColumn : partitionColumns) {
+                  if (segmentPartitionMetadata.getPartitions(partitionColumn).size() == 1) {
+                    partitionsBuffer.add(segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next());
+                  } else {
+                    partitionsBuffer.clear();
+                    break;
+                  }
+                }
+              }
+              if (partitionsBuffer.isEmpty()) {
                 outlierSegments.add(selectedSegment);
               } else {
-                int partition = segmentPartitionMetadata.getPartitions(partitionColumn).iterator().next();
-                partitionToSegments.computeIfAbsent(partition, k -> new ArrayList<>()).add(selectedSegment);
+                String partitionId = StringUtils.join(partitionsBuffer, "_");
+                partitionToSegments.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(selectedSegment);
               }
             }
 
-            for (Map.Entry<Integer, List<SegmentZKMetadata>> partitionToSegmentsEntry
-                : partitionToSegments.entrySet()) {
+            for (Map.Entry<String, List<SegmentZKMetadata>> partitionToSegmentsEntry : partitionToSegments.entrySet()) {

Review Comment:
   (minor) The keys are not used, use `valueSet()`



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -351,30 +350,39 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
                     mergeConfigs, taskConfigs));
           }
         } else {
-          // For partitioned table, schedule separate tasks for each partition
+          // For partitioned table, schedule separate tasks for each partitionId (partitionId is constructed from
+          // partitions of all partition columns. There should be exact match between partition columns of segment and
+          // partition columns of table configuration, and there is only partition per column in segment metadata).
+          // Other segments which do not meet these conditions are considered as outlier segments, and additional tasks
+          // are generated for them.
           Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
-          Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot partition on multiple columns for table: %s",
-              tableConfig.getTableName());
-          Map.Entry<String, ColumnPartitionConfig> partitionEntry = columnPartitionMap.entrySet().iterator().next();
-          String partitionColumn = partitionEntry.getKey();
-
+          Set<String> partitionColumns = columnPartitionMap.keySet();
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
-            Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>();
-            // Handle segments that have multiple partitions or no partition info
+            Map<String, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>();

Review Comment:
   (minor) You may directly use `List<Integer>` as the key



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java:
##########
@@ -99,8 +103,16 @@ public void testRealtimeToOfflineSegmentsTask()
     List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(_offlineTableName);

Review Comment:
   I think the support is not added to realtime table, so how does it work on this test?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java:
##########
@@ -54,18 +57,18 @@
  */
 public class PartitionSegmentPruner implements SegmentPruner {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionSegmentPruner.class);
-  private static final PartitionInfo INVALID_PARTITION_INFO = new PartitionInfo(null, null);
+  private static final Map<String, PartitionInfo> INVALID_COLUMN_PARTITION_INFO_MAP = Collections.emptyMap();
 
   private final String _tableNameWithType;
-  private final String _partitionColumn;
+  private final Set<String> _partitionColumns;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final String _segmentZKMetadataPathPrefix;
-  private final Map<String, PartitionInfo> _partitionInfoMap = new ConcurrentHashMap<>();
+  private final Map<String, Map<String, PartitionInfo>> _segmentColumnPartitionInfoMap = new ConcurrentHashMap<>();

Review Comment:
   Storing a map for each segment can potentially cause performance regression and extra memory footprint, and we already observe partition pruning becomes a bottleneck when a table contains a lot of segments. Let's add it as a separate implementation so that for single partition column case (probably 99% of the use case), we don't have to pay the extra cost. You may rename the current one to `SinglePartitionColumnSegmentPruner`, then add a `MultiPartitionColumnsSegmentPruner`



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -351,30 +350,39 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
                     mergeConfigs, taskConfigs));
           }
         } else {
-          // For partitioned table, schedule separate tasks for each partition
+          // For partitioned table, schedule separate tasks for each partitionId (partitionId is constructed from
+          // partitions of all partition columns. There should be exact match between partition columns of segment and
+          // partition columns of table configuration, and there is only partition per column in segment metadata).
+          // Other segments which do not meet these conditions are considered as outlier segments, and additional tasks
+          // are generated for them.
           Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
-          Preconditions.checkState(columnPartitionMap.size() == 1, "Cannot partition on multiple columns for table: %s",
-              tableConfig.getTableName());
-          Map.Entry<String, ColumnPartitionConfig> partitionEntry = columnPartitionMap.entrySet().iterator().next();
-          String partitionColumn = partitionEntry.getKey();
-
+          Set<String> partitionColumns = columnPartitionMap.keySet();
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : selectedSegmentsForAllBuckets) {
-            Map<Integer, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>();
-            // Handle segments that have multiple partitions or no partition info
+            Map<String, List<SegmentZKMetadata>> partitionToSegments = new HashMap<>();
             List<SegmentZKMetadata> outlierSegments = new ArrayList<>();
             for (SegmentZKMetadata selectedSegment : selectedSegmentsPerBucket) {
               SegmentPartitionMetadata segmentPartitionMetadata = selectedSegment.getPartitionMetadata();
-              if (segmentPartitionMetadata == null
-                  || segmentPartitionMetadata.getPartitions(partitionColumn).size() != 1) {
+              List<Integer> partitionsBuffer = new ArrayList<>();
+              if (segmentPartitionMetadata != null && partitionColumns.equals(
+                  segmentPartitionMetadata.getColumnPartitionMap().keySet())) {
+                for (String partitionColumn : partitionColumns) {

Review Comment:
   We need to make `partitionColumns` a list to have consistent order



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org