You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/07/20 01:32:02 UTC

[rocketmq] branch develop updated: [ISSUE #2708] fix offset rollback when fetch offset from broker exception

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d7c2d25  [ISSUE #2708] fix offset rollback when fetch offset from broker exception
     new 5b945a9  Merge pull request #3158 from Zanglei06/fix_rmq_client_offset_dev
d7c2d25 is described below

commit d7c2d25c2a7344506ee001f7dfaf72f62c5f6c47
Author: zanglei <za...@kuaishou.com>
AuthorDate: Fri Jul 16 11:01:15 2021 +0800

    [ISSUE #2708] fix offset rollback when fetch offset from broker exception
---
 .../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java     | 2 +-
 .../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java       | 2 +-
 .../java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java   | 2 +-
 .../apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java    | 3 ++-
 4 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e835be1..d28d23a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -785,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 long offset = 0L;
                 try {
                     offset = nextPullOffset(messageQueue);
-                } catch (MQClientException e) {
+                } catch (Exception e) {
                     log.error("Failed to get next pull offset", e);
                     scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
                     return;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index bb0b7f1..59b8deb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     long offset = -1L;
                     try {
                         offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
-                    } catch (MQClientException e) {
+                    } catch (Exception e) {
                         this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                         log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
                         return;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 833d465..7677d8b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -378,7 +378,7 @@ public abstract class RebalanceImpl {
                 long nextOffset = -1L;
                 try {
                     nextOffset = this.computePullFromWhereWithException(mq);
-                } catch (MQClientException e) {
+                } catch (Exception e) {
                     log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
                     continue;
                 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 286c684..8fe9400 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
                         try {
                             result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                         } catch (MQClientException e) {
-                            result = -1;
+                            log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
+                            throw e;
                         }
                     }
                 } else {