You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/07/20 01:32:02 UTC
[rocketmq] branch develop updated: [ISSUE #2708] fix offset
rollback when fetch offset from broker exception
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d7c2d25 [ISSUE #2708] fix offset rollback when fetch offset from broker exception
new 5b945a9 Merge pull request #3158 from Zanglei06/fix_rmq_client_offset_dev
d7c2d25 is described below
commit d7c2d25c2a7344506ee001f7dfaf72f62c5f6c47
Author: zanglei <za...@kuaishou.com>
AuthorDate: Fri Jul 16 11:01:15 2021 +0800
[ISSUE #2708] fix offset rollback when fetch offset from broker exception
---
.../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java | 2 +-
.../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java | 2 +-
.../java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java | 2 +-
.../apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java | 3 ++-
4 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e835be1..d28d23a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -785,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long offset = 0L;
try {
offset = nextPullOffset(messageQueue);
- } catch (MQClientException e) {
+ } catch (Exception e) {
log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index bb0b7f1..59b8deb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
- } catch (MQClientException e) {
+ } catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 833d465..7677d8b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -378,7 +378,7 @@ public abstract class RebalanceImpl {
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
- } catch (MQClientException e) {
+ } catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 286c684..8fe9400 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
- result = -1;
+ log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
+ throw e;
}
}
} else {