You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@rocketmq.apache.org by "Jaskey Lam (JIRA)" <ji...@apache.org> on 2017/03/01 06:30:46 UTC
[jira] [Comment Edited] (ROCKETMQ-112) MQ client
CONSUME_FROM_LAST_OFFSET dont work
[ https://issues.apache.org/jira/browse/ROCKETMQ-112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889595#comment-15889595 ]
Jaskey Lam edited comment on ROCKETMQ-112 at 3/1/17 6:30 AM:
-------------------------------------------------------------
[~zhaoziyan] please reformat your comment accroing to https://jira.atlassian.com/secure/WikiRendererHelpAction.jspa?section=all
If a new consumer group is starts, most of the time CONSUME_FROM_LAST_OFFSET will work , please refer to queryConsumerOffset in ConsumerManageProcessor.java.
It will return the min offset only when the topic is still quite new(minOffset==0) and no large accumulation(checkInDiskByConsumeOffset=false).
But indeed, this is a bit confusing, I suggest we respect strictly for case of CONSUME_FROM_LAST_OFFSET.
{code}
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}
{code}
was (Author: jaskey):
[~zhaoziyan] please reformat your comment accroing to https://jira.atlassian.com/secure/WikiRendererHelpAction.jspa?section=all
If a new consumer group is starts, most of the time CONSUME_FROM_LAST_OFFSET will work , please refer to queryConsumerOffset in ConsumerManageProcessor.java.
It will return the min offset only when the topic is still quite new(minOffset==0) and no large accumulation(checkInDiskByConsumeOffset=false).
But indeed, this is a bit confusing, I suggest to respect strictly for case of CONSUME_FROM_LAST_OFFSET.
{code}
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}
{code}
> MQ client CONSUME_FROM_LAST_OFFSET dont work
> --------------------------------------------
>
> Key: ROCKETMQ-112
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-112
> Project: Apache RocketMQ
> Issue Type: Bug
> Components: rocketmq-client
> Reporter: zhaoziyan
> Assignee: Xiaorui Wang
>
> case CONSUME_FROM_LAST_OFFSET: {
> long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
> if (lastOffset >= 0) {
> result = lastOffset;
> }
> // First start,no offset
> else if (-1 == lastOffset) {
> if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
> result = 0L;
> }
> else {
> try {
> result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
> }
> catch (MQClientException e) {
> result = -1;
> }
> }
> }
> else {
> result = -1;
> }
> break;
> }
> offsetStore.readOffset is minOffset not the maxOffset
> CONSUME_FROM_LAST_OFFSET dont work
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)