You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/06/08 12:47:00 UTC

[hudi] branch master updated: [HUDI-918] Fix kafkaOffsetGen can not read kafka data bug (#1652)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 97ab97b  [HUDI-918] Fix kafkaOffsetGen can not read kafka data bug (#1652)
97ab97b is described below

commit 97ab97b72635164db5ac2a4f93e72e224603ffe0
Author: liujinhui <96...@qq.com>
AuthorDate: Mon Jun 8 20:46:47 2020 +0800

    [HUDI-918] Fix kafkaOffsetGen can not read kafka data bug (#1652)
---
 .../org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java   | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 39c47a2..9331274 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -207,6 +208,11 @@ public class KafkaOffsetGen {
     maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE)
         ? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka;
     long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit;
+
+    if (numEvents < toOffsets.size()) {
+      throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
+    }
+
     return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
   }