You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@rocketmq.apache.org by "Jaskey Lam (JIRA)" <ji...@apache.org> on 2017/03/01 06:30:46 UTC

[jira] [Comment Edited] (ROCKETMQ-112) MQ client CONSUME_FROM_LAST_OFFSET dont work

    [ https://issues.apache.org/jira/browse/ROCKETMQ-112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889595#comment-15889595 ] 

Jaskey Lam edited comment on ROCKETMQ-112 at 3/1/17 6:30 AM:
-------------------------------------------------------------

[~zhaoziyan] please reformat your comment accroing to https://jira.atlassian.com/secure/WikiRendererHelpAction.jspa?section=all

If a new consumer group is starts,  most of the time CONSUME_FROM_LAST_OFFSET will work , please refer to queryConsumerOffset in ConsumerManageProcessor.java.

It will return the min offset only when the topic is still quite new(minOffset==0) and no large accumulation(checkInDiskByConsumeOffset=false).

But indeed, this is a bit confusing, I suggest we respect strictly for case of CONSUME_FROM_LAST_OFFSET.

{code}
        if (offset >= 0) {
            responseHeader.setOffset(offset);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        } else {
            long minOffset =
                this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
                    requestHeader.getQueueId());
            if (minOffset <= 0
                && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
                requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
                responseHeader.setOffset(0L);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
            } else {
                response.setCode(ResponseCode.QUERY_NOT_FOUND);
                response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
            }
        }

{code}


was (Author: jaskey):
[~zhaoziyan] please reformat your comment accroing to https://jira.atlassian.com/secure/WikiRendererHelpAction.jspa?section=all

If a new consumer group is starts,  most of the time CONSUME_FROM_LAST_OFFSET will work , please refer to queryConsumerOffset in ConsumerManageProcessor.java.

It will return the min offset only when the topic is still quite new(minOffset==0) and no large accumulation(checkInDiskByConsumeOffset=false).

But indeed, this is a bit confusing, I suggest to respect strictly for case of CONSUME_FROM_LAST_OFFSET.

{code}
        if (offset >= 0) {
            responseHeader.setOffset(offset);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        } else {
            long minOffset =
                this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
                    requestHeader.getQueueId());
            if (minOffset <= 0
                && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
                requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
                responseHeader.setOffset(0L);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
            } else {
                response.setCode(ResponseCode.QUERY_NOT_FOUND);
                response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
            }
        }

{code}

> MQ client CONSUME_FROM_LAST_OFFSET dont work
> --------------------------------------------
>
>                 Key: ROCKETMQ-112
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-112
>             Project: Apache RocketMQ
>          Issue Type: Bug
>          Components: rocketmq-client
>            Reporter: zhaoziyan
>            Assignee: Xiaorui Wang
>
>         case CONSUME_FROM_LAST_OFFSET: {
>             long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
>             if (lastOffset >= 0) {
>                 result = lastOffset;
>             }
>             // First start,no offset
>             else if (-1 == lastOffset) {
>                 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
>                     result = 0L;
>                 }
>                 else {
>                     try {
>                         result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
>                     }
>                     catch (MQClientException e) {
>                         result = -1;
>                     }
>                 }
>             }
>             else {
>                 result = -1;
>             }
>             break;
>         }
> offsetStore.readOffset is minOffset not the maxOffset 
> CONSUME_FROM_LAST_OFFSET dont work



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)