You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/02/10 05:04:26 UTC

[GitHub] [druid] kfaraz commented on a change in pull request #12236: Assign partitionIds in the same order as bucketIds

kfaraz commented on a change in pull request #12236:
URL: https://github.com/apache/druid/pull/12236#discussion_r803304037



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -923,58 +924,66 @@ private PartitionBoundaries determineRangePartition(Collection<StringDistributio
     return partitions;
   }
 
-  private static Map<Pair<Interval, Integer>, List<PartitionLocation>> groupGenericPartitionLocationsPerPartition(
+  /**
+   * Creates a map from partition (interval + bucketId) to the corresponding
+   * PartitionLocations. Note that the bucketId maybe different from the final
+   * partitionId (refer to {@link BuildingShardSpec} for more details).
+   */
+  static Map<Partition, List<PartitionLocation>> getPartitionToLocations(
       Map<String, GeneratedPartitionsReport> subTaskIdToReport
   )
   {
-    final Map<Pair<Interval, Integer>, BuildingShardSpec<?>> intervalAndIntegerToShardSpec = new HashMap<>();
-    final Object2IntMap<Interval> intervalToNextPartitionId = new Object2IntOpenHashMap<>();
-    final BiFunction<String, PartitionStat, PartitionLocation> createPartitionLocationFunction =
-        (subtaskId, partitionStat) -> {
-          final BuildingShardSpec<?> shardSpec = intervalAndIntegerToShardSpec.computeIfAbsent(
-              Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()),
-              key -> {
-                // Lazily determine the partitionId to create packed partitionIds for the core partitions.
-                // See the Javadoc of BucketNumberedShardSpec for details.
-                final int partitionId = intervalToNextPartitionId.computeInt(
-                    partitionStat.getInterval(),
-                    ((interval, nextPartitionId) -> nextPartitionId == null ? 0 : nextPartitionId + 1)
-                );
-                return partitionStat.getSecondaryPartition().convert(partitionId);
-              }
-          );
-          return partitionStat.toPartitionLocation(subtaskId, shardSpec);
-        };
+    // Create a map from partition to list of reports (PartitionStat and subTaskId)
+    final Map<Partition, List<PartitionReport>> partitionToReports = new TreeMap<>(
+        // Sort by (interval, bucketId) to maintain order of partitionIds within interval
+        Comparator
+            .comparingLong((Partition partition) -> partition.getInterval().getStartMillis())
+            .thenComparingLong(partition -> partition.getInterval().getEndMillis())
+            .thenComparingInt(Partition::getBucketId)
+    );
+    subTaskIdToReport.forEach(
+        (subTaskId, report) -> report.getPartitionStats().forEach(
+            partitionStat -> partitionToReports
+                .computeIfAbsent(Partition.fromStat(partitionStat), p -> new ArrayList<>())
+                .add(new PartitionReport(subTaskId, partitionStat))
+        )
+    );
 
-    return groupPartitionLocationsPerPartition(subTaskIdToReport, createPartitionLocationFunction);
-  }
+    final Map<Partition, List<PartitionLocation>> partitionToLocations = new HashMap<>();
 
-  private static <L extends PartitionLocation>
-      Map<Pair<Interval, Integer>, List<L>> groupPartitionLocationsPerPartition(
-      Map<String, ? extends GeneratedPartitionsReport> subTaskIdToReport,
-      BiFunction<String, PartitionStat, L> createPartitionLocationFunction
-  )
-  {
-    // partition (interval, partitionId) -> partition locations
-    final Map<Pair<Interval, Integer>, List<L>> partitionToLocations = new HashMap<>();
-    for (Entry<String, ? extends GeneratedPartitionsReport> entry : subTaskIdToReport.entrySet()) {
-      final String subTaskId = entry.getKey();
-      final GeneratedPartitionsReport report = entry.getValue();
-      for (PartitionStat partitionStat : report.getPartitionStats()) {
-        final List<L> locationsOfSamePartition = partitionToLocations.computeIfAbsent(
-            Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()),
-            k -> new ArrayList<>()
-        );
-        locationsOfSamePartition.add(createPartitionLocationFunction.apply(subTaskId, partitionStat));
+    Interval prevInterval = null;
+    final AtomicInteger partitionId = new AtomicInteger(0);
+    for (Entry<Partition, List<PartitionReport>> entry : partitionToReports.entrySet()) {
+      final Partition partition = entry.getKey();
+
+      // Reset the partitionId if this is a new interval
+      Interval interval = partition.getInterval();
+      if (!interval.equals(prevInterval)) {
+        partitionId.set(0);
+        prevInterval = interval;
       }
+
+      // Use any PartitionStat of this partition to create a shard spec
+      final List<PartitionReport> reportsOfPartition = entry.getValue();
+      final BuildingShardSpec<?> shardSpec = reportsOfPartition
+          .get(0).getPartitionStat().getSecondaryPartition()
+          .convert(partitionId.getAndIncrement());

Review comment:
       Here, the `getSecondaryPartition()` returns a `BucketNumberedShardSpec`.
   `BucketNumberedShardSpec.convert()` accepts a `partitionId`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org