You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:23 UTC
[rocketmq-flink] 10/33: fix concurrent checkpoint bug
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 8531921af25bd94a32f6fa149f3147fe8ad370be
Author: Jennifer-sarah <42...@users.noreply.github.com>
AuthorDate: Fri Mar 22 08:23:08 2019 +0800
fix concurrent checkpoint bug
fix concurrent checkpoint bug
---
.../org/apache/rocketmq/flink/RocketMQSource.java | 39 ++++++++++++++++++++--
1 file changed, 36 insertions(+), 3 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 5b76e54..b6e68f8 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -19,11 +19,13 @@
package org.apache.rocketmq.flink;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.Validate;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -43,6 +45,7 @@ import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -78,6 +81,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
private Map<MessageQueue, Long> offsetTable;
private Map<MessageQueue, Long> restoredOffsets;
+ /** Data for pending but uncommitted offsets. */
+ private LinkedMap pendingOffsetsToCommit;
private Properties props;
private String topic;
@@ -113,6 +118,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
+ if (pendingOffsetsToCommit == null) {
+ pendingOffsetsToCommit = new LinkedMap();
+ }
runningChecker = new RunningChecker();
@@ -263,6 +271,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
offsetTable.clear();
restoredOffsets.clear();
+ pendingOffsetsToCommit.clear();
}
@Override
@@ -291,13 +300,18 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
unionOffsetStates.clear();
+ HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
+
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ currentOffsets.put(entry.getKey(), entry.getValue());
}
+ pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
- offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
+ offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
}
}
@@ -337,14 +351,33 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- // callback when checkpoint complete
+ // callback when checkpoint complete
if (!runningChecker.isRunning()) {
LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
return;
}
- for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
+ final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ return;
+ }
+
+ Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>)pendingOffsetsToCommit.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingOffsetsToCommit.remove(0);
+ }
+
+ if (offsets == null || offsets.size() == 0) {
+ LOG.debug("Checkpoint state was empty.");
+ return;
+ }
+
+ for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
}
+
}
}