You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/07 02:37:27 UTC

[rocketmq-flink] branch main updated: [ISSUE #76] Fix bug when the job restore from ck (#77)

This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 422909f  [ISSUE #76] Fix bug when the job restore from ck (#77)
422909f is described below

commit 422909f352808b15f9eef1e699af0bd0fff17a88
Author: 高思伟 <48...@users.noreply.github.com>
AuthorDate: Wed Dec 7 10:37:22 2022 +0800

    [ISSUE #76] Fix bug when the job restore from ck (#77)
    
    Fix bug when the job restore from ck
    
    Co-authored-by: 高思伟 <si...@amh-group.com>
---
 .../apache/rocketmq/flink/legacy/RocketMQSourceFunction.java | 12 +++++-------
 .../legacy/sourceFunction/RocketMQSourceFunctionTest.java    |  3 ++-
 2 files changed, 7 insertions(+), 8 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 97037a1..fa57839 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -176,10 +176,6 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
         if (restoredOffsets == null) {
             restoredOffsets = new ConcurrentHashMap<>();
         }
-
-        // use restoredOffsets to init offset table.
-        initOffsetTableFromRestoredOffsets();
-
         if (pendingOffsetsToCommit == null) {
             pendingOffsetsToCommit = new LinkedMap();
         }
@@ -250,7 +246,9 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
                 RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
         // If the job recovers from the state, the state has already contained the offsets of last
         // commit.
-        if (!restored) {
+        if (restored) {
+            initOffsetTableFromRestoredOffsets(messageQueues);
+        } else {
             initOffsets(messageQueues);
         }
     }
@@ -539,11 +537,11 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
         }
     }
 
-    public void initOffsetTableFromRestoredOffsets() {
+    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) {
         Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
         restoredOffsets.forEach(
                 (mq, offset) -> {
-                    if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
+                    if (messageQueues.contains(mq)) {
                         offsetTable.put(mq, offset);
                     }
                 });
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index 8b7b44f..6ef73e6 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.flink.legacy.common.serialization.SimpleStringDeseria
 
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -78,7 +79,7 @@ public class RocketMQSourceFunctionTest {
         map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
         setFieldValue(source, "restoredOffsets", map);
         setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
-        source.initOffsetTableFromRestoredOffsets();
+        source.initOffsetTableFromRestoredOffsets(new ArrayList<>(map.keySet()));
         Map<MessageQueue, Long> offsetTable = (Map) getFieldValue(source, "offsetTable");
         for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
             assertEquals(offsetTable.containsKey(entry.getKey()), true);