You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/04/03 08:30:51 UTC

[rocketmq] branch develop updated: [ISSUE #1912]Polish the committed offset logic for the lite pull consumer.

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new dc815cc  [ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
dc815cc is described below

commit dc815cc581f9e9dd36efa926fc7087ff1ecbf1a6
Author: Heng Du <du...@apache.org>
AuthorDate: Fri Apr 3 16:30:39 2020 +0800

    [ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
    
    [ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
---
 .../apache/rocketmq/client/consumer/DefaultLitePullConsumer.java    | 6 +++---
 .../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java  | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index ef76cfd..6718eb5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
 
     @Override
     public Long committed(MessageQueue messageQueue) throws MQClientException {
-        return this.defaultLitePullConsumerImpl.committed(messageQueue);
+        return this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
     }
 
     @Override
@@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
 
     @Override
     public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
-        this.defaultLitePullConsumerImpl.seekToBegin(messageQueue);
+        this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue));
     }
 
     @Override
     public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
-        this.defaultLitePullConsumerImpl.seekToEnd(messageQueue);
+        this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue));
     }
 
     @Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 8ad7a6b..f54078f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
 
     public long committed(MessageQueue messageQueue) throws MQClientException {
         checkServiceState();
-        long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
+        long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
         if (offset == -2)
             throw new MQClientException("Fetch consume offset from broker exception", null);
         return offset;