You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:23 UTC

[rocketmq-flink] 10/33: fix concurrent checkpoint bug

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 8531921af25bd94a32f6fa149f3147fe8ad370be
Author: Jennifer-sarah <42...@users.noreply.github.com>
AuthorDate: Fri Mar 22 08:23:08 2019 +0800

    fix concurrent checkpoint bug
    
    fix concurrent checkpoint bug
---
 .../org/apache/rocketmq/flink/RocketMQSource.java  | 39 ++++++++++++++++++++--
 1 file changed, 36 insertions(+), 3 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 5b76e54..b6e68f8 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -19,11 +19,13 @@
 package org.apache.rocketmq.flink;
 
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.collections.map.LinkedMap;
 import org.apache.commons.lang.Validate;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -43,6 +45,7 @@ import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullTaskCallback;
 import org.apache.rocketmq.client.consumer.PullTaskContext;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -78,6 +81,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
     private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
     private Map<MessageQueue, Long> offsetTable;
     private Map<MessageQueue, Long> restoredOffsets;
+    /** Data for pending but uncommitted offsets. */
+    private LinkedMap pendingOffsetsToCommit;
 
     private Properties props;
     private String topic;
@@ -113,6 +118,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
         if (restoredOffsets == null) {
             restoredOffsets = new ConcurrentHashMap<>();
         }
+        if (pendingOffsetsToCommit == null) {
+            pendingOffsetsToCommit = new LinkedMap();
+        }
 
         runningChecker = new RunningChecker();
 
@@ -263,6 +271,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
 
         offsetTable.clear();
         restoredOffsets.clear();
+        pendingOffsetsToCommit.clear();
     }
 
     @Override
@@ -291,13 +300,18 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
 
         unionOffsetStates.clear();
 
+        HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
+
         for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
             unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+            currentOffsets.put(entry.getKey(), entry.getValue());
         }
 
+        pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
-                    offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
+                offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
         }
     }
 
@@ -337,14 +351,33 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        // callback when checkpoint complete 
+        // callback when checkpoint complete
         if (!runningChecker.isRunning()) {
             LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
             return;
         }
 
-        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
+        final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+        if (posInMap == -1) {
+            LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+            return;
+        }
+
+        Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>)pendingOffsetsToCommit.remove(posInMap);
+
+        // remove older checkpoints in map
+        for (int i = 0; i < posInMap; i++) {
+            pendingOffsetsToCommit.remove(0);
+        }
+
+        if (offsets == null || offsets.size() == 0) {
+            LOG.debug("Checkpoint state was empty.");
+            return;
+        }
+
+        for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
             consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
         }
+
     }
 }