You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/28 08:58:03 UTC

[rocketmq-connect] branch master updated: [ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask pulls messages (#331)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 34f60629 [ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask pulls messages (#331)
34f60629 is described below

commit 34f6062936ae993fa32215e6a170402f299ca89f
Author: Slideee <ye...@corp.netease.com>
AuthorDate: Wed Sep 28 16:57:57 2022 +0800

    [ISSUE #330] fix OFFSET_ILLEGAL error when RmqSourceTask pulls messages (#331)
---
 .../src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 78d2457e..d74761b7 100644
--- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -146,6 +146,12 @@ public class RmqSourceTask extends SourceTask {
                     PullResult pullResult = consumer.pull(taskTopicConfig, "*",
                         this.mqOffsetMap.get(taskTopicConfig), 32);
                     switch (pullResult.getPullStatus()) {
+                        case OFFSET_ILLEGAL: {
+                            if (this.mqOffsetMap.get(taskTopicConfig) < pullResult.getNextBeginOffset()) {
+                                this.mqOffsetMap.put(taskTopicConfig, pullResult.getNextBeginOffset());
+                            }
+                            break;
+                        }
                         case FOUND: {
                             this.mqOffsetMap.put(taskTopicConfig, pullResult.getNextBeginOffset());
                             List<MessageExt> msgs = pullResult.getMsgFoundList();