You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 07:53:27 UTC

[pulsar] 03/03: Fixed race condition on multi-topic consumer (#11764)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 587548ec5f9e9a9fc6d465aca3cd3fbb76b9ee64
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Aug 25 00:33:35 2021 -0700

    Fixed race condition on multi-topic consumer (#11764)
    
    ### Motivation
    
    Under certain conditions applications using the multi-topic consumers might get the consumption stalled:
    
    The conditions to reproduce the issue are:
     * Consumer is subscribed to multiple topics, but only 1 topic has traffic
     * Messages are published in batches (no repro if no batches)
     * Receiver queue size == 1 (or small, in order to exercise race condition)
    
    The problem is that there is race condition between 2 threads when we're deciding to put one of the individual consumers in "paused" state, when the shared queue is full.
    
    What happens is that, just after we checked the conditions and we decide to mark the consumer as paused, the application  has emptied the shared queue completely. From that point on, there is no re-attempt to check whether we need to unblock that consumer.
    
    ### Modification
    
    Instead of introducing a sync block (contended by many consumers), we just double check the state of the shared queue after marking the consumer as "paused". If the other thread has emptied the queue in the meantime, we'll be guaranteed to unblock the consumer.
    
    (cherry picked from commit f1d66d1a635dfcdc3335a5c8d5ea6d68e38ae71c)
---
 .../java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java  | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 727f2d6..997358d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -259,6 +259,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 // mark this consumer to be resumed later: if No more space left in shared queue,
                 // or if any consumer is already paused (to create fair chance for already paused consumers)
                 pausedConsumers.add(consumer);
+
+                // Since we din't get a mutex, the condition on the incoming queue might have changed after
+                // we have paused the current consumer. We need to re-check in order to avoid this consumer
+                // from getting stalled.
+                resumeReceivingFromPausedConsumersIfNeeded();
             } else {
                 // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
                 // recursion and stack overflow