You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/04/03 08:30:51 UTC
[rocketmq] branch develop updated: [ISSUE #1912]Polish the
committed offset logic for the lite pull consumer.
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new dc815cc [ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
dc815cc is described below
commit dc815cc581f9e9dd36efa926fc7087ff1ecbf1a6
Author: Heng Du <du...@apache.org>
AuthorDate: Fri Apr 3 16:30:39 2020 +0800
[ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
[ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
---
.../apache/rocketmq/client/consumer/DefaultLitePullConsumer.java | 6 +++---
.../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java | 2 +-
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index ef76cfd..6718eb5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public Long committed(MessageQueue messageQueue) throws MQClientException {
- return this.defaultLitePullConsumerImpl.committed(messageQueue);
+ return this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
}
@Override
@@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
- this.defaultLitePullConsumerImpl.seekToBegin(messageQueue);
+ this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue));
}
@Override
public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
- this.defaultLitePullConsumerImpl.seekToEnd(messageQueue);
+ this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue));
}
@Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 8ad7a6b..f54078f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
- long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
+ long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (offset == -2)
throw new MQClientException("Fetch consume offset from broker exception", null);
return offset;