You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/06/10 11:41:45 UTC

[GitHub] [hudi] Litianye commented on a change in pull request #1719: [HUDI-1006]deltastreamer use kafkaSource with offset reset strategy:latest can't consume data

Litianye commented on a change in pull request #1719:
URL: https://github.com/apache/hudi/pull/1719#discussion_r438058262



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -57,10 +57,10 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa
   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
     OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
     long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
+    LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
     if (totalNewMsgs <= 0) {
-      return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
-    } else {
-      LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
+      return new InputBatch<>(Option.empty(),
+              lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : CheckpointUtils.offsetsToStr(offsetRanges));

Review comment:
       For a hudi table already has a `""` checkpoint, `lastCheckpointStr` can be `""` here, so i change return value to return new endOffsets checkpoint string in all case.
   And i add a simple test case for kafka latest offset reset strategy in [hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java](url).
   Thanks for review! :handshake:




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org