You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@rocketmq.apache.org by "Jaskey Lam (JIRA)" <ji...@apache.org> on 2017/02/24 05:17:44 UTC

[jira] [Updated] (ROCKETMQ-110) consumeConcurrentlyMaxSpan has a very limited role

     [ https://issues.apache.org/jira/browse/ROCKETMQ-110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jaskey Lam updated ROCKETMQ-110:
--------------------------------
    Description: 
Actually, in the current implementation of  consumeConcurrentlyMaxSpan , the role it plays is very limited.

In my opinion, RocketMQ hopes to solve a problem that if some messages blocks( may be possibly considered as dead lock or dead loop),say message with offset 100 blocked,  and the rest whose offset is 101 to 2100 are very healthy, in the case that if the process is killed since it can not be shutdown normally if dead lock or dead loop is really happens when consuming, the later messages 101 to 2100 which is consumed sucessfully will be repeated to consume again since the consumer offset will still remains at 100.

So to reduce the influence of repeated message numbers, flow control should be taken.  

But the current implementaion is to compare the span of last message of the first message to consumeConcurrentlyMaxSpan.

In the above example, the span is 2000 and flow control may do action to make it pause for one cycle for 50ms, but next time when the message 2100 and the rest of healthy message  consumed successfully , the fisrt key and the last key will be the same, 110, and the max span will be considered as 0, pull operation will continue.


    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }

So for single message problem, consumeConcurrentlyMaxSpan will help nothing but in my opion this is what flow control should also takes into considerations.

I suggest maxSpan should be lastConsumedOffset(does not record now) - firstConsumingOffset(the first key of msgTreeMap).


  was:
Actually, in the current implementation of  consumeConcurrentlyMaxSpan , the role it plays is very limited.

In my opinion, RocketMQ hopes to solve a problem that if some messages blocks( may be possibly considered as dead lock or dead loop),say message with offset 100 blocked,  and the rest whose offset is 101 to 2100 are very healthy, in the case that if the process is killed since it can not be shutdown normally if dead lock or dead loop is really happens when consuming, the later messages 101 to 2100 which is consumed sucessfully will be repeated to consume again since the consumer offset will still remains at 100.

So to reduce the influence of repeated message numbers, flow control should be taken.  

But the current implementaion is to compare the span of last message of the first message to consumeConcurrentlyMaxSpan.

In the above example, the span is 2000 and flow control may do action to make it pause for one cycle for 50ms, but next time when the message 2100 and the rest of healthy message  consumed successfully , the fisrt key and the last key will be the same, 110, and the max span will be considered as 0, pull operation will continue.


    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }

So for single message problem, consumeConcurrentlyMaxSpan will help nothing but in my opion this is what flow control should also takes into considerations.

I suggest maxSpan should be lastConsumedOffset - firstConsumingOffset(the first key of msgTreeMap).



> consumeConcurrentlyMaxSpan has a very limited role
> --------------------------------------------------
>
>                 Key: ROCKETMQ-110
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-110
>             Project: Apache RocketMQ
>          Issue Type: Improvement
>          Components: rocketmq-client
>    Affects Versions: 4.0.0-incubating
>            Reporter: Jaskey Lam
>            Assignee: Xiaorui Wang
>
> Actually, in the current implementation of  consumeConcurrentlyMaxSpan , the role it plays is very limited.
> In my opinion, RocketMQ hopes to solve a problem that if some messages blocks( may be possibly considered as dead lock or dead loop),say message with offset 100 blocked,  and the rest whose offset is 101 to 2100 are very healthy, in the case that if the process is killed since it can not be shutdown normally if dead lock or dead loop is really happens when consuming, the later messages 101 to 2100 which is consumed sucessfully will be repeated to consume again since the consumer offset will still remains at 100.
> So to reduce the influence of repeated message numbers, flow control should be taken.  
> But the current implementaion is to compare the span of last message of the first message to consumeConcurrentlyMaxSpan.
> In the above example, the span is 2000 and flow control may do action to make it pause for one cycle for 50ms, but next time when the message 2100 and the rest of healthy message  consumed successfully , the fisrt key and the last key will be the same, 110, and the max span will be considered as 0, pull operation will continue.
>     public long getMaxSpan() {
>         try {
>             this.lockTreeMap.readLock().lockInterruptibly();
>             try {
>                 if (!this.msgTreeMap.isEmpty()) {
>                     return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
>                 }
>             } finally {
>                 this.lockTreeMap.readLock().unlock();
>             }
>         } catch (InterruptedException e) {
>             log.error("getMaxSpan exception", e);
>         }
>         return 0;
>     }
> So for single message problem, consumeConcurrentlyMaxSpan will help nothing but in my opion this is what flow control should also takes into considerations.
> I suggest maxSpan should be lastConsumedOffset(does not record now) - firstConsumingOffset(the first key of msgTreeMap).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)