You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/01 11:49:26 UTC

[GitHub] [rocketmq] ZJRui opened a new issue #4086: the release of the ProcessQueue lock is more suitable after the consume offset is submitted, not just after the listener has consumed the message

ZJRui opened a new issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086


   
   **BUG REPORT**   maybe
   
   1. Please describe the issue you observed:
   
   For sequential message consumption, the release of the ProcessQueue lock is more suitable after the consume offset is submitted, not just after the listener has consumed the message
   
   When the consumer that consumes sequentially releases the lock of the MessageQueue, it will check whether there is currently a message being consumed through the lock of the ProcessQueue.
   
   
   When the consumer that consumes sequentially releases the lock of the MessageQueue, it will check whether there is currently a message being consumed through the lock of the ProcessQueue.
   
   If there is, in order to avoid the repeated consumption of sequential messages after the MessageQueue is reassigned, it is necessary to wait for the ProcessQueue lock to be released before releasing the MessageQueue lock.
   
   In the current implementation, the lock is released after the Listener has finished consuming the message, and the consume offset has not yet been submitted. If the MessageQueue is released at this time, there will still be other consumers repeatedly consuming the message for which the  offset  has not been submitted.
   
   Is this logic correct and Is it necessary to place the release of the ProcessQueue lock after the consumption displacement is committed?
   
   [current implemention](https://github.com/apache/rocketmq/blob/e3b67484b0ea3172cf0555e39efe36c0030c53b3/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java#L490-L508)
   
   2. Please tell us about your environment:
   4.9.3
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ZJRui commented on issue #4086: the release of the ProcessQueue lock is more suitable after the consume offset is submitted, not just after the listener has consumed the message

Posted by GitBox <gi...@apache.org>.
ZJRui commented on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1086543695


   > I'm not sure about the purpose of `comsumeLock` when consuming message. It seems like the `objLock` is enough. @guyinyou @panzhi33 can you help confirm this?
   > 
   > [use of comsumeLock](https://github.com/apache/rocketmq/blob/e3b67484b0ea3172cf0555e39efe36c0030c53b3/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java#L490-L508)
   
   
   First : i mean " other consumer process ", not "other consume thread in one consumer "
   
   Second :  I am wrong . my proposal is wrong. 
   
   There is no need to place the release of the ProcessQueue lock after the commit consume offset. Because the offset  submitted in ConsumeRequest is only submitted to the local memory, the offset is not actually submitted to the Broker.
   
   Therefore, there is no essential difference between the release of the ProcessQueue lock after the listener processes the message and after the offset is submitted in the ConsumeRequest. Neither case actually commits the offset to the Broker.  
   
   
   Third, I think ProcessQueue lock does the following:
   The lock of ProcessQueue is used to represent a state: that is, there are messages being processed in the current ProcessQueue. 
   
   When RebalanceImpl thread  reassigns the MessageQueue, if the MessageQueue no longer belongs to the sequential consumer ,
   Then you need to send a request to the Broker to release the lock of the MessageQueue, but if a message is being consumed by the current consumer, after the MessageQueue is released, the consumers of other processes obtain the MessageQueue, and then pull the message consumption from it, which will
   Causes a MessageQueue to be consumed by two process Consumers at the same time. 
   
   When we have ProcessQueue lock, after the reallocation, the current Consumer will check the lock status of the ProcessQueue when releasing the lock of the MessageQueue. If the ProcessQueue is locked, the release of the lock of the current MessageQueue will be delayed.
   
   
   Fourth: Based on the third point of discussion, I am more puzzled about why we use consumeLock(ReentranceLock) in ProcessQueue to represent this state, instead of simply using volatile boolean
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] MatrixHB removed a comment on issue #4086: the release of the ProcessQueue lock is more suitable after the consume offset is submitted, not just after the listener has consumed the message

Posted by GitBox <gi...@apache.org>.
MatrixHB removed a comment on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1085943494


   In the client code of sequential consumption, the following 3 points can avoid disorder of sequential messages.
   1、lock the MessageQueue with only one consumer 
   use the following judgement in `ProcessQueue`
   ```
       public boolean isLocked() {
           return locked;
       }
   ```
   You can study further on the client code `org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ` and the broker code `org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch` .  Try to understand how to ensure that sequential consumer for each queue is static,and Load-balancing will not cause consumer to be replaced for one queue .
   Therefore, it won't happen what you said "there will still be other consumers repeatedly consuming the message"
   
   2、lock treeMapLock when multi Consume thread taking message concurrently
   `org.apache.rocketmq.client.impl.consumer.ProcessQueue#takeMessages`
   ```
               this.treeMapLock.writeLock().lockInterruptibly();
               try {
                   if (!this.msgTreeMap.isEmpty()) {
                       for (int i = 0; i < batchSize; i++) {
                           Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                           if (entry != null) {
                               result.add(entry.getValue());
                           } else {
                               break;
                           }
                       }
                   }
               } finally {
                   this.treeMapLock.writeLock().unlock();
               }
   ```
   
   3、lock consumeLock when multi Consume thread consume message concurrently
   ```
                               try {
                                   this.processQueue.getConsumeLock().lock();
                                   status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                               } finally {
                                   this.processQueue.getConsumeLock().unlock();
                               }
   ```
   Therefore, the function of the lock you mentioned in this issue is for the concurrent consumption of multiple threads in the same consumer, and it can be unlocked after consumption.
   
   If you have any questions, we can discuss more.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] MatrixHB commented on issue #4086: the release of the ProcessQueue lock is more suitable after the consume offset is submitted, not just after the listener has consumed the message

Posted by GitBox <gi...@apache.org>.
MatrixHB commented on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1086485737


   I'm not sure about the purpose of `comsumeLock` when consuming message. It seems like the `objLock` is enough. @guyinyou @panzhi33 can you help confirm this?
   
   [use of comsumeLock](https://github.com/apache/rocketmq/blob/e3b67484b0ea3172cf0555e39efe36c0030c53b3/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java#L490-L508)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] MatrixHB commented on issue #4086: the release of the ProcessQueue lock is more suitable after the consume offset is submitted, not just after the listener has consumed the message

Posted by GitBox <gi...@apache.org>.
MatrixHB commented on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1085943494


   In the client code of sequential consumption, the following 3 points can avoid disorder of sequential messages.
   1、lock the MessageQueue with only one consumer 
   use the following judgement in `ProcessQueue`
   ```
       public boolean isLocked() {
           return locked;
       }
   ```
   You can study further on the client code `org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ` and the broker code `org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch` .  Try to understand how to ensure that sequential consumer for each queue is static,and Load-balancing will not cause consumer to be replaced for one queue .
   Therefore, it won't happen what you said "there will still be other consumers repeatedly consuming the message"
   
   2、lock treeMapLock when multi Consume thread taking message concurrently
   `org.apache.rocketmq.client.impl.consumer.ProcessQueue#takeMessages`
   ```
               this.treeMapLock.writeLock().lockInterruptibly();
               try {
                   if (!this.msgTreeMap.isEmpty()) {
                       for (int i = 0; i < batchSize; i++) {
                           Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                           if (entry != null) {
                               result.add(entry.getValue());
                           } else {
                               break;
                           }
                       }
                   }
               } finally {
                   this.treeMapLock.writeLock().unlock();
               }
   ```
   
   3、lock consumeLock when multi Consume thread consume message concurrently
   ```
                               try {
                                   this.processQueue.getConsumeLock().lock();
                                   status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                               } finally {
                                   this.processQueue.getConsumeLock().unlock();
                               }
   ```
   Therefore, the function of the lock you mentioned in this issue is for the concurrent consumption of multiple threads in the same consumer, and it can be unlocked after consumption.
   
   If you have any questions, we can discuss more.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] MatrixHB commented on issue #4086: the release of the ProcessQueue lock is more suitable after the consume offset is submitted, not just after the listener has consumed the message

Posted by GitBox <gi...@apache.org>.
MatrixHB commented on issue #4086:
URL: https://github.com/apache/rocketmq/issues/4086#issuecomment-1086058078


   You said "there will still be other consumers repeatedly consuming the message",  do you mean other consumer process, or other consume thread in one consumer ?
   
   If you mean "other consumer process", nothing to worry about. 
   You can study further on the client code `org.apache.rocketmq.client.impl.MQClientAPIImpl#lockBatchMQ` and the broker code `org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager#tryLockBatch` . It can be ensured that sequential consumer for each queue is static,and Load-balancing will not cause consumer to be replaced for one queue .
   
   If you mean "other consume thread in one consumer", please pay attention to the synchronized lock in `ConsumeRequest`.
   ```
   final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
   synchronized (objLock) {
          ....
          status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
   }
   ```
   This ensures that only one ConsumeRequest thread can consume messages in each queue at the same time. When the message is consumed, it will be removed from ProcessQueue, seeing `org.apache.rocketmq.client.impl.consumer.ProcessQueue#takeMessages`. Therefore, it won't happen that another thread repeatedly consume the message. 
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org