You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/28 08:58:03 UTC
[rocketmq-connect] branch master updated: [ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask pulls messages (#331)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 34f60629 [ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask pulls messages (#331)
34f60629 is described below
commit 34f6062936ae993fa32215e6a170402f299ca89f
Author: Slideee <ye...@corp.netease.com>
AuthorDate: Wed Sep 28 16:57:57 2022 +0800
[ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask pulls messages (#331)
---
.../src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 78d2457e..d74761b7 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -146,6 +146,12 @@ public class RmqSourceTask extends SourceTask {
PullResult pullResult = consumer.pull(taskTopicConfig, "*",
this.mqOffsetMap.get(taskTopicConfig), 32);
switch (pullResult.getPullStatus()) {
+ case OFFSET_ILLEGAL: {
+ if (this.mqOffsetMap.get(taskTopicConfig) < pullResult.getNextBeginOffset()) {
+ this.mqOffsetMap.put(taskTopicConfig, pullResult.getNextBeginOffset());
+ }
+ break;
+ }
case FOUND: {
this.mqOffsetMap.put(taskTopicConfig, pullResult.getNextBeginOffset());
List<MessageExt> msgs = pullResult.getMsgFoundList();