You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:30 UTC
[rocketmq-flink] 17/33: Merge pull request #229 from
Jennifer-sarah/master
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit c6cb79d2536991367456086228a83a5518c19e02
Merge: 0be1ed7 8531921
Author: Xin Wang <xi...@apache.org>
AuthorDate: Sun Jul 7 19:37:50 2019 +0800
Merge pull request #229 from Jennifer-sarah/master
update consumer offset after checkpoint completed
.../org/apache/rocketmq/flink/RocketMQSource.java | 65 ++++++++++++++++++++--
1 file changed, 59 insertions(+), 6 deletions(-)
diff --cc src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 18277e0,b6e68f8..06eecfb
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@@ -22,10 -23,9 +23,11 @@@ import java.util.HashMap
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+ import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.Validate;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@@ -286,17 -300,18 +302,22 @@@ public class RocketMQSource<OUT> extend
unionOffsetStates.clear();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
- offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
- }
+ HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
+ // remove the unassigned queues in order to avoid read the wrong offset when the source restart
+ Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
+ offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));
+
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ currentOffsets.put(entry.getKey(), entry.getValue());
+ }
+
+ pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
+ offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
}
}