You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/10/11 12:28:50 UTC
[incubator-hudi] branch master updated: [HUDI-292] Avoid consuming
more entries from kafka than specified sourceLimit. (#947)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e10e069 [HUDI-292] Avoid consuming more entries from kafka than specified sourceLimit. (#947)
e10e069 is described below
commit e10e06918e4758917513c55f9bc02c35dad99128
Author: leesf <49...@qq.com>
AuthorDate: Fri Oct 11 20:28:45 2019 +0800
[HUDI-292] Avoid consuming more entries from kafka than specified sourceLimit. (#947)
- Special handling when allocedEvents > numEvents
- Added unit tests
---
.../apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java | 5 +++++
.../org/apache/hudi/utilities/sources/TestKafkaSource.java | 10 ++++++++++
2 files changed, 15 insertions(+)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 558e757..9dd232d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -125,6 +125,11 @@ public class KafkaOffsetGen {
exhaustedPartitions.add(range.partition());
}
allocedEvents += toOffset - range.untilOffset();
+ // We need recompute toOffset if allocedEvents larger than numEvents.
+ if (allocedEvents > numEvents) {
+ long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents));
+ toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
+ }
ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset);
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index 9825ae6..9ac4bf4 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -183,5 +183,15 @@ public class TestKafkaSource extends UtilitiesTestBase {
assertEquals(10, ranges[0].count());
assertEquals(100000, ranges[1].count());
assertEquals(10000, ranges[2].count());
+
+ // not all partitions consume same entries.
+ ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}),
+ makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 1000, 1000}), 1001);
+ assertEquals(1001, CheckpointUtils.totalNewMessages(ranges));
+ assertEquals(100, ranges[0].count());
+ assertEquals(226, ranges[1].count());
+ assertEquals(226, ranges[2].count());
+ assertEquals(226, ranges[3].count());
+ assertEquals(223, ranges[4].count());
}
}