You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:25 UTC

[rocketmq-flink] 12/33: Fix getting wrong offset bug when the source restart (#190)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 58d07fbe52968e43e7eb041045bbcc194bd904b7
Author: tangyoupeng <to...@juicedata.io>
AuthorDate: Mon Jun 3 11:10:17 2019 +0800

    Fix getting wrong offset bug when the source restart (#190)
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 8e8e57b..9940e8e 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang.Validate;
@@ -289,6 +290,10 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
                 offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
         }
 
+        // remove the unassigned queues in order to avoid read the wrong offset when the source restart
+        Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
+        offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));
+
         for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
             unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
         }