You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/10/11 12:28:50 UTC

[incubator-hudi] branch master updated: [HUDI-292] Avoid consuming more entries from kafka than specified sourceLimit. (#947)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e10e069  [HUDI-292] Avoid consuming more entries from kafka than specified sourceLimit. (#947)
e10e069 is described below

commit e10e06918e4758917513c55f9bc02c35dad99128
Author: leesf <49...@qq.com>
AuthorDate: Fri Oct 11 20:28:45 2019 +0800

    [HUDI-292] Avoid consuming more entries from kafka than specified sourceLimit. (#947)
    
     - Special handling when allocedEvents > numEvents
     - Added unit tests
---
 .../apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java  |  5 +++++
 .../org/apache/hudi/utilities/sources/TestKafkaSource.java     | 10 ++++++++++
 2 files changed, 15 insertions(+)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 558e757..9dd232d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -125,6 +125,11 @@ public class KafkaOffsetGen {
               exhaustedPartitions.add(range.partition());
             }
             allocedEvents += toOffset - range.untilOffset();
+            // We need recompute toOffset if allocedEvents larger than numEvents.
+            if (allocedEvents > numEvents) {
+              long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents));
+              toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
+            }
             ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset);
           }
         }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index 9825ae6..9ac4bf4 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -183,5 +183,15 @@ public class TestKafkaSource extends UtilitiesTestBase {
     assertEquals(10, ranges[0].count());
     assertEquals(100000, ranges[1].count());
     assertEquals(10000, ranges[2].count());
+
+    // not all partitions consume same entries.
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}),
+        makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 1000, 1000}), 1001);
+    assertEquals(1001, CheckpointUtils.totalNewMessages(ranges));
+    assertEquals(100, ranges[0].count());
+    assertEquals(226, ranges[1].count());
+    assertEquals(226, ranges[2].count());
+    assertEquals(226, ranges[3].count());
+    assertEquals(223, ranges[4].count());
   }
 }