You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/06/10 04:25:46 UTC

[GitHub] [hudi] garyli1019 commented on a change in pull request #1719: [HUDI-1006]deltastreamer use kafkaSource with offset reset strategy:latest can't consume data

garyli1019 commented on a change in pull request #1719:
URL: https://github.com/apache/hudi/pull/1719#discussion_r437841744



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -57,10 +57,10 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa
   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
     OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
     long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
+    LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
     if (totalNewMsgs <= 0) {
-      return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");

Review comment:
       hmmm interesting... so right now if we use `LATEST` as reset key, then we will fall into a dead loop unless we are lucky enough to have message fall in between two `consumer.endOffsets(topicPartitions)` call.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -57,10 +57,10 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa
   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
     OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
     long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
+    LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
     if (totalNewMsgs <= 0) {
-      return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
-    } else {
-      LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
+      return new InputBatch<>(Option.empty(),
+              lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : CheckpointUtils.offsetsToStr(offsetRanges));

Review comment:
       could `lastCheckpointStr` be `""` here? 
   Also, can we add a test for this case?
   https://github.com/apache/hudi/blob/master/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java#L107




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org