You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "bvaradar (via GitHub)" <gi...@apache.org> on 2023/04/08 20:25:15 UTC

[GitHub] [hudi] bvaradar commented on a diff in pull request #8376: [HUDI-6019] support split kafka source by count

bvaradar commented on code in PR #8376:
URL: https://github.com/apache/hudi/pull/8376#discussion_r1161153689


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -151,6 +166,40 @@ public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOf
       return ranges;
     }
 
+    public static OffsetRange[] splitRangesByCount(OffsetRange[] oldRanges, long maxEvents) {
+      List<OffsetRange> newRanges = new ArrayList<>();
+      for (OffsetRange range : oldRanges) {
+        newRanges.addAll(splitSingleRange(range, maxEvents));
+      }
+      return newRanges.toArray(new OffsetRange[0]);
+    }
+
+    public static OffsetRange[] mergeRangesByTp(OffsetRange[] oldRanges) {

Review Comment:
   Can you add the comment why this is needed - Because, we need to maintain the checkpoint with one offset range per topic partition. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java:
##########
@@ -63,6 +63,14 @@ public class KafkaSourceConfig extends HoodieConfig {
       .defaultValue(5000000L)
       .withDocumentation("Maximum number of records obtained in each batch.");
 
+  public static final ConfigProperty<Long> MAX_EVENTS_PER_KAFKA_PARTITION = ConfigProperty

Review Comment:
   Yes,  Can we just have a parallelism config for Kafka read with default 0 indicating number of kafka partitions be used as parallelism "P". Instead of the current approach where one kafka partition translates to one spark partition, we can bin-pack the offsets into the "P" buckets with the constraint that one spark partition reads from only one kafka partition. Let me know if this makes sense. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -242,7 +291,15 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
       throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
     }
 
-    return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
+    OffsetRange[] ranges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
+    LOG.info("before split by count: " + CheckpointUtils.offsetsStringfy(ranges));
+    long maxEventsPerPartition = props.getLong(KafkaSourceConfig.MAX_EVENTS_PER_KAFKA_PARTITION.key(),
+            KafkaSourceConfig.MAX_EVENTS_PER_KAFKA_PARTITION.defaultValue());
+    LOG.info("getNextOffsetRanges set config " + KafkaSourceConfig.MAX_EVENTS_PER_KAFKA_PARTITION.key() + " to " + maxEventsPerPartition);
+
+    ranges = CheckpointUtils.splitRangesByCount(ranges, maxEventsPerPartition);

Review Comment:
   Can we move the split ranges logic to be under CheckpointUtils.computeOffsetRanges as we do a merge there as well in the allocation logic. 
   
    See the section below the comment in CheckpointUtils
   ```
   // Allocate the remaining events to non-exhausted partitions, in round robin fashion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org