You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/28 05:58:09 UTC
[pulsar] branch master updated: Fix race condition on close
consumer while reconnect to broker. (#7589)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0b37b0c Fix race condition on close consumer while reconnect to broker. (#7589)
0b37b0c is described below
commit 0b37b0c76dd2faaa9b8cc8d5b316ff35a307cfc1
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jul 28 13:57:46 2020 +0800
Fix race condition on close consumer while reconnect to broker. (#7589)
### Modifications
Add state check when connection opened of the consumer. If the consumer state is closing or closed, we don’t need to send the subscribe command
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ba54a1a..76e0726 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -729,6 +729,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
@Override
public void connectionOpened(final ClientCnx cnx) {
+ if (getState() == State.Closing || getState() == State.Closed) {
+ setState(State.Closed);
+ closeConsumerTasks();
+ client.cleanupConsumer(this);
+ failPendingReceive();
+ clearReceiverQueue();
+ return;
+ }
setClientCnx(cnx);
cnx.registerConsumer(consumerId, this);