You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/07/23 10:55:05 UTC

[GitHub] [rocketmq] caigy commented on a change in pull request #2719: [ISSUE #2708] fix offset back to minLogicOffset when timeout or connect fail

caigy commented on a change in pull request #2719:
URL: https://github.com/apache/rocketmq/pull/2719#discussion_r675440779



##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
##########
@@ -271,6 +271,10 @@ public void pullMessage(final PullRequest pullRequest) {
             if (processQueue.isLocked()) {
                 if (!pullRequest.isLockedFirst()) {
                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
+                    if (offset == -3) {

Review comment:
       Also use constant instead of '-3' here.

##########
File path: client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
##########
@@ -97,6 +97,11 @@ public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
                     catch (MQBrokerException e) {
                         return -1;
                     }
+                    //remotingException, should retry
+                    catch (RemotingException e) {
+                      log.warn("fetchConsumeOffsetFromBroker RemotingException, " + mq, e);
+                      return -3;

Review comment:
       Define a meaningful constant, instead of the magic number '-3'

##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
##########
@@ -271,6 +271,10 @@ public void pullMessage(final PullRequest pullRequest) {
             if (processQueue.isLocked()) {
                 if (!pullRequest.isLockedFirst()) {
                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
+                    if (offset == -3) {
+                      this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
+                      log.info("pull message later because remoting exception with broker, {}", pullRequest);
+                    }

Review comment:
       Should the procedure just return here?

##########
File path: client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
##########
@@ -271,6 +271,10 @@ public void pullMessage(final PullRequest pullRequest) {
             if (processQueue.isLocked()) {
                 if (!pullRequest.isLockedFirst()) {
                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());

Review comment:
       Please resolve conflicts with master.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org