You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/10/14 13:43:47 UTC
[pulsar] branch master updated: [fix][broker] make closing producer thread-safe while updating recently closed producer (#21355)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a5f4c1e0e61 [fix][broker] make closing producer thread-safe while updating recently closed producer (#21355)
a5f4c1e0e61 is described below
commit a5f4c1e0e612e7568bb4b6be1e41f71bc4ac4ff2
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat Oct 14 06:43:38 2023 -0700
[fix][broker] make closing producer thread-safe while updating recently closed producer (#21355)
---
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0517fff0f03..34542d56938 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -46,7 +46,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@@ -56,6 +55,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -186,7 +186,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final BrokerService service;
private final SchemaRegistryService schemaService;
private final String listenerName;
- private final HashMap<Long, Long> recentlyClosedProducers;
+ private final Map<Long, Long> recentlyClosedProducers;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private final boolean enableSubscriptionPatternEvaluation;
@@ -291,7 +291,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
.expectedItems(8)
.concurrencyLevel(1)
.build();
- this.recentlyClosedProducers = new HashMap<>();
+ this.recentlyClosedProducers = new ConcurrentHashMap<>();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
@@ -2984,7 +2984,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
@Override
public void closeProducer(Producer producer) {
- assert ctx.executor().inEventLoop();
// removes producer-connection from map and send close command to producer
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {