You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:26 UTC

[rocketmq-flink] 13/33: fix(module): fix load wrong offset from savepoint (#288)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit c9564d6d2b122345889d33b02e2d4ae5098a675c
Author: MeYJ <Me...@users.noreply.github.com>
AuthorDate: Mon Jun 3 11:33:38 2019 +0800

    fix(module): fix load wrong offset from savepoint (#288)
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 9940e8e..ccd6bb4 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -320,7 +320,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
             for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
                 // unionOffsetStates is the restored global union state;
                 // should only snapshot mqs that actually belong to us
-                restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+                if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
+                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+                }
             }
             LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
         } else {