You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/05/24 02:32:35 UTC

[incubator-hudi] branch master updated: HUDI-105 : Fix up offsets not available on leader exception (#650)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f120427  HUDI-105 : Fix up offsets not available on leader exception (#650)
f120427 is described below

commit f120427607986de82f99f2ba9a2eb0ff136c2bae
Author: leiline <la...@outlook.com>
AuthorDate: Fri May 24 10:32:31 2019 +0800

    HUDI-105 : Fix up offsets not available on leader exception (#650)
    
    * Fix up offsets not available on leader exception
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
index 947f3c4..5a4b727 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
@@ -204,8 +204,9 @@ public class KafkaOffsetGen {
 
     // Determine the offset ranges to read from
     HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets;
+    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets;
     if (lastCheckpointStr.isPresent()) {
-      fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+      fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions);
     } else {
       KafkaResetOffsetStrategies autoResetValue =  KafkaResetOffsetStrategies.valueOf(
               props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
@@ -235,6 +236,26 @@ public class KafkaOffsetGen {
     return offsetRanges;
   }
 
+  // check up checkpoint offsets is valid or not, if true,  return checkpoint offsets,
+  // else return earliest offsets
+  private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(
+          KafkaCluster cluster,
+          Optional<String> lastCheckpointStr,
+          Set<TopicAndPartition> topicPartitions) {
+    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
+            CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+    HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets =
+            new HashMap(ScalaHelpers.toJavaMap(
+            cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
+
+    boolean checkpointOffsetReseter = checkpointOffsets.entrySet()
+            .stream()
+            .anyMatch(offset -> offset.getValue().offset()
+                    < earliestOffsets.get(offset.getKey()).offset());
+    return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
+  }
+
+
   public String getTopicName() {
     return topicName;
   }