You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/10/14 13:43:47 UTC

[pulsar] branch master updated: [fix][broker] make closing producer thread-safe while updating recently closed producer (#21355)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f4c1e0e61 [fix][broker] make closing producer thread-safe while updating recently closed producer (#21355)
a5f4c1e0e61 is described below

commit a5f4c1e0e612e7568bb4b6be1e41f71bc4ac4ff2
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat Oct 14 06:43:38 2023 -0700

    [fix][broker] make closing producer thread-safe while updating recently closed producer (#21355)
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0517fff0f03..34542d56938 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -46,7 +46,6 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +55,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -186,7 +186,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     private final BrokerService service;
     private final SchemaRegistryService schemaService;
     private final String listenerName;
-    private final HashMap<Long, Long> recentlyClosedProducers;
+    private final Map<Long, Long> recentlyClosedProducers;
     private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
     private final boolean enableSubscriptionPatternEvaluation;
@@ -291,7 +291,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 .expectedItems(8)
                 .concurrencyLevel(1)
                 .build();
-        this.recentlyClosedProducers = new HashMap<>();
+        this.recentlyClosedProducers = new ConcurrentHashMap<>();
         this.replicatorPrefix = conf.getReplicatorPrefix();
         this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
         this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
@@ -2984,7 +2984,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     public void closeProducer(Producer producer) {
-        assert ctx.executor().inEventLoop();
         // removes producer-connection from map and send close command to producer
         safelyRemoveProducer(producer);
         if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {