You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/01 14:14:39 UTC

[GitHub] [rocketmq] MatrixHB removed a comment on issue #4086: the release of the ProcessQueue lock is more suitable after the consume offset is submitted, not just after the listener has consumed the message

MatrixHB removed a comment on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1085943494


   In the client code of sequential consumption, the following 3 points can avoid disorder of sequential messages.
   1、lock the MessageQueue with only one consumer 
   use the following judgement in `ProcessQueue`
   ```
       public boolean isLocked() {
           return locked;
       }
   ```
   You can study further on the client code `org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ` and the broker code `org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch` .  Try to understand how to ensure that sequential consumer for each queue is static,and Load-balancing will not cause consumer to be replaced for one queue .
   Therefore, it won't happen what you said "there will still be other consumers repeatedly consuming the message"
   
   2、lock treeMapLock when multi Consume thread taking message concurrently
   `org.apache.rocketmq.client.impl.consumer.ProcessQueue#takeMessages`
   ```
               this.treeMapLock.writeLock().lockInterruptibly();
               try {
                   if (!this.msgTreeMap.isEmpty()) {
                       for (int i = 0; i < batchSize; i++) {
                           Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                           if (entry != null) {
                               result.add(entry.getValue());
                           } else {
                               break;
                           }
                       }
                   }
               } finally {
                   this.treeMapLock.writeLock().unlock();
               }
   ```
   
   3、lock consumeLock when multi Consume thread consume message concurrently
   ```
                               try {
                                   this.processQueue.getConsumeLock().lock();
                                   status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                               } finally {
                                   this.processQueue.getConsumeLock().unlock();
                               }
   ```
   Therefore, the function of the lock you mentioned in this issue is for the concurrent consumption of multiple threads in the same consumer, and it can be unlocked after consumption.
   
   If you have any questions, we can discuss more.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org