You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:38 UTC

[rocketmq-flink] 25/33: [rocketmq-connector-flink] rebalance cause offset rollback to long time ago (#672)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit da92a64a12f4de3b352f29182a4565cc4ac23ae4
Author: JerryTaoTao <27...@qq.com>
AuthorDate: Thu Jan 21 19:19:19 2021 +0800

    [rocketmq-connector-flink] rebalance cause offset rollback to long time ago (#672)
    
    Co-authored-by: hzyuemeng1 <hz...@corp.netease.com>
---
 .../org/apache/rocketmq/flink/RocketMQSource.java    | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 35c5122..72783a8 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.SimpleCounter;
@@ -120,6 +121,10 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
         if (restoredOffsets == null) {
             restoredOffsets = new ConcurrentHashMap<>();
         }
+
+        //use restoredOffsets to init offset table.
+        initOffsetTableFromRestoredOffsets();
+
         if (pendingOffsetsToCommit == null) {
             pendingOffsetsToCommit = new LinkedMap();
         }
@@ -252,13 +257,10 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
         Long offset = offsetTable.get(mq);
         // restoredOffsets(unionOffsetStates) is the restored global union state;
         // should only snapshot mqs that actually belong to us
-        if (restored && offset == null) {
-            offset = restoredOffsets.get(mq);
-        }
         if (offset == null) {
             // fetchConsumeOffset from broker
             offset = consumer.fetchConsumeOffset(mq, false);
-            if (offset < 0) {
+            if (!restored || offset < 0) {
                 String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
                 switch (initialOffset) {
                     case CONSUMER_OFFSET_EARLIEST:
@@ -318,6 +320,16 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
         }
     }
 
+    public void initOffsetTableFromRestoredOffsets() {
+        Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
+        restoredOffsets.forEach((mq, offset) -> {
+            if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
+                offsetTable.put(mq, offset);
+            }
+        });
+        log.info("init offset table from restoredOffsets successful.", offsetTable);
+    }
+
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws Exception {
         // called when a snapshot for a checkpoint is requested