You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:30 UTC

[rocketmq-flink] 17/33: Merge pull request #229 from Jennifer-sarah/master

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit c6cb79d2536991367456086228a83a5518c19e02
Merge: 0be1ed7 8531921
Author: Xin Wang <xi...@apache.org>
AuthorDate: Sun Jul 7 19:37:50 2019 +0800

    Merge pull request #229 from Jennifer-sarah/master
    
    update consumer offset after checkpoint completed

 .../org/apache/rocketmq/flink/RocketMQSource.java  | 65 ++++++++++++++++++++--
 1 file changed, 59 insertions(+), 6 deletions(-)

diff --cc src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 18277e0,b6e68f8..06eecfb
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@@ -22,10 -23,9 +23,11 @@@ import java.util.HashMap
  import java.util.List;
  import java.util.Map;
  import java.util.Properties;
 +import java.util.Set;
 +import java.util.UUID;
  import java.util.concurrent.ConcurrentHashMap;
  
+ import org.apache.commons.collections.map.LinkedMap;
  import org.apache.commons.lang.Validate;
  import org.apache.flink.api.common.state.ListState;
  import org.apache.flink.api.common.state.ListStateDescriptor;
@@@ -286,17 -300,18 +302,22 @@@ public class RocketMQSource<OUT> extend
  
          unionOffsetStates.clear();
  
-         if (LOG.isDebugEnabled()) {
-             LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
-                 offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
-         }
+         HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
  
 +        // remove the unassigned queues in order to avoid read the wrong offset when the source restart
 +        Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
 +        offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));
 +
          for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
              unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+             currentOffsets.put(entry.getKey(), entry.getValue());
+         }
+ 
+         pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+ 
+         if (LOG.isDebugEnabled()) {
+             LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
+                 offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
          }
      }