You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/05/24 02:32:35 UTC
[incubator-hudi] branch master updated: HUDI-105 : Fix up offsets
not available on leader exception (#650)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f120427 HUDI-105 : Fix up offsets not available on leader exception (#650)
f120427 is described below
commit f120427607986de82f99f2ba9a2eb0ff136c2bae
Author: leiline <la...@outlook.com>
AuthorDate: Fri May 24 10:32:31 2019 +0800
HUDI-105 : Fix up offsets not available on leader exception (#650)
* Fix up offsets not available on leader exception
---
.../utilities/sources/helpers/KafkaOffsetGen.java | 23 +++++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
index 947f3c4..5a4b727 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java
@@ -204,8 +204,9 @@ public class KafkaOffsetGen {
// Determine the offset ranges to read from
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets;
+ HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets;
if (lastCheckpointStr.isPresent()) {
- fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+ fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions);
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf(
props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
@@ -235,6 +236,26 @@ public class KafkaOffsetGen {
return offsetRanges;
}
+ // check up checkpoint offsets is valid or not, if true, return checkpoint offsets,
+ // else return earliest offsets
+ private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(
+ KafkaCluster cluster,
+ Optional<String> lastCheckpointStr,
+ Set<TopicAndPartition> topicPartitions) {
+ HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
+ CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+ HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets =
+ new HashMap(ScalaHelpers.toJavaMap(
+ cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
+
+ boolean checkpointOffsetReseter = checkpointOffsets.entrySet()
+ .stream()
+ .anyMatch(offset -> offset.getValue().offset()
+ < earliestOffsets.get(offset.getKey()).offset());
+ return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
+ }
+
+
public String getTopicName() {
return topicName;
}