You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:26 UTC
[rocketmq-flink] 13/33: fix(module): fix load wrong offset from
savepoint (#288)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit c9564d6d2b122345889d33b02e2d4ae5098a675c
Author: MeYJ <Me...@users.noreply.github.com>
AuthorDate: Mon Jun 3 11:33:38 2019 +0800
fix(module): fix load wrong offset from savepoint (#288)
---
src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 9940e8e..ccd6bb4 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -320,7 +320,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
// unionOffsetStates is the restored global union state;
// should only snapshot mqs that actually belong to us
- restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+ if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
+ restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+ }
}
LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
} else {