You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:21 UTC
[rocketmq-flink] 08/33: Optimizing update offset code logic
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit f74cc2709208efa82588701554fb30410fd33469
Author: Jennifer-sarah <42...@users.noreply.github.com>
AuthorDate: Fri Mar 22 00:48:05 2019 +0800
Optimizing update offset code logic
Optimizing update offset code logic
---
.../org/apache/rocketmq/flink/RocketMQSource.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 14b8042..f610efe 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -85,6 +86,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private transient volatile boolean restored;
+ private transient boolean enableCheckpoint;
public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
this.schema = schema;
@@ -103,6 +105,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
+ this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
+
if (offsetTable == null) {
offsetTable = new ConcurrentHashMap<>();
}
@@ -243,7 +247,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
offsetTable.put(mq, offset);
- consumer.updateConsumeOffset(mq, offset);
+ if (!enableCheckpoint) {
+ consumer.updateConsumeOffset(mq, offset);
+ }
}
@Override
@@ -285,14 +291,14 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
unionOffsetStates.clear();
+ for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
+ unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
}
-
- for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
- unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
- }
}
@Override
@@ -330,7 +336,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
}
@Override
- public void notifyCheckpointComplete(long l) throws Exception {
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // consumer.c
if (!runningChecker.isRunning()) {
LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
return;