You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:21 UTC

[rocketmq-flink] 08/33: Optimizing update offset code logic

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit f74cc2709208efa82588701554fb30410fd33469
Author: Jennifer-sarah <42...@users.noreply.github.com>
AuthorDate: Fri Mar 22 00:48:05 2019 +0800

    Optimizing update offset code logic
    
    Optimizing update offset code logic
---
 .../org/apache/rocketmq/flink/RocketMQSource.java     | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 14b8042..f610efe 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -85,6 +86,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
     private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
 
     private transient volatile boolean restored;
+    private transient boolean enableCheckpoint;
 
     public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
         this.schema = schema;
@@ -103,6 +105,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
         Validate.notEmpty(topic, "Consumer topic can not be empty");
         Validate.notEmpty(group, "Consumer group can not be empty");
 
+        this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
+
         if (offsetTable == null) {
             offsetTable = new ConcurrentHashMap<>();
         }
@@ -243,7 +247,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
 
     private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
         offsetTable.put(mq, offset);
-        consumer.updateConsumeOffset(mq, offset);
+        if (!enableCheckpoint) {
+            consumer.updateConsumeOffset(mq, offset);
+        }
     }
 
     @Override
@@ -285,14 +291,14 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
 
         unionOffsetStates.clear();
 
+        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
+            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
                     offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
         }
-
-        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
-            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
-        }
     }
 
     @Override
@@ -330,7 +336,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
     }
 
     @Override
-    public void notifyCheckpointComplete(long l) throws Exception {
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // consumer.c
         if (!runningChecker.isRunning()) {
             LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
             return;