You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/22 11:53:16 UTC
[pulsar] 07/07: [Broker] Fix race conditions in closing producers and consumers (#13428)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit da61b915d9f648754b40dc928430746d89d00cbb
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Dec 22 07:05:48 2021 +0200
[Broker] Fix race conditions in closing producers and consumers (#13428)
- closing ServerCnx while producers or consumers are created can lead
to a producer or consumer never getting removed from the topic's
list of producers
(cherry picked from commit 3316db5a52cdeee49bc90fe18baac28d5688bfe8)
---
.../apache/pulsar/broker/service/ServerCnx.java | 24 ++++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a5d55bf..dbab10d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -292,6 +292,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
// Connection is gone, close the producers immediately
producers.forEach((__, producerFuture) -> {
+ // prevent race conditions in completing producers
+ if (!producerFuture.isDone()
+ && producerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) {
+ return;
+ }
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow(true);
@@ -299,17 +304,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
});
consumers.forEach((__, consumerFuture) -> {
- Consumer consumer;
- if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
- consumer = consumerFuture.getNow(null);
- } else {
+ // prevent race conditions in completing consumers
+ if (!consumerFuture.isDone()
+ && consumerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) {
return;
}
-
- try {
- consumer.close();
- } catch (BrokerServiceException e) {
- log.warn("Consumer {} was already closed: {}", consumer, e);
+ if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+ Consumer consumer = consumerFuture.getNow(null);
+ try {
+ consumer.close();
+ } catch (BrokerServiceException e) {
+ log.warn("Consumer {} was already closed: {}", consumer, e);
+ }
}
});
this.service.getPulsarStats().recordConnectionClose();