You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/28 05:58:09 UTC

[pulsar] branch master updated: Fix race condition on close consumer while reconnect to broker. (#7589)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b37b0c  Fix race condition on close consumer while reconnect to broker. (#7589)
0b37b0c is described below

commit 0b37b0c76dd2faaa9b8cc8d5b316ff35a307cfc1
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 28 13:57:46 2020 +0800

    Fix race condition on close consumer while reconnect to broker. (#7589)
    
    ### Modifications
    
    Add state check when connection opened of the consumer. If the consumer state is closing or closed, we don’t need to send the subscribe command
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ba54a1a..76e0726 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -729,6 +729,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     @Override
     public void connectionOpened(final ClientCnx cnx) {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            setState(State.Closed);
+            closeConsumerTasks();
+            client.cleanupConsumer(this);
+            failPendingReceive();
+            clearReceiverQueue();
+            return;
+        }
         setClientCnx(cnx);
         cnx.registerConsumer(consumerId, this);