You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@rocketmq.apache.org by "Jaskey Lam (JIRA)" <ji...@apache.org> on 2017/04/17 12:43:42 UTC

[jira] [Created] (ROCKETMQ-180) CONSUME_FROM_TIMESTAMP will repeat the last message though timestamp is new enough

Jaskey Lam created ROCKETMQ-180:
-----------------------------------

             Summary: CONSUME_FROM_TIMESTAMP will repeat the last message though timestamp is new enough
                 Key: ROCKETMQ-180
                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-180
             Project: Apache RocketMQ
          Issue Type: Bug
          Components: rocketmq-client
    Affects Versions: 4.0.0-incubating
            Reporter: Jaskey Lam
            Assignee: Xiaorui Wang


When using CONSUME_FROM_TIMESTAMP , rocketmq will try to search latest offset and use it for first pull

However, the search offset function will always return a existing offset, which will cause repeated message though the last message is old enough and has been consumed already.


This problem has been found in BROADCASTING mode, but the same problem should exist in CLUSTERING mode.

Problem code snippet 

{code}

            case CONSUME_FROM_TIMESTAMP: {
                long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                if (lastOffset >= 0) {
                    result = lastOffset;
                } else if (-1 == lastOffset) {
                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    } else {
                        try {
                            long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                UtilAll.YYYY_MMDD_HHMMSS).getTime();
                            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);// Here!! This represents a message that has been stored.
                        } catch (MQClientException e) {
                            result = -1;
                        }
                    }
                } else {
                    result = -1;
                }
                break;
            }

{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)