You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/01/21 21:53:29 UTC

[pulsar] branch master updated: PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e4c3ec  PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)
1e4c3ec is described below

commit 1e4c3ec4f55ad0b0729f2849915f6b4d9e426bb1
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu Jan 21 22:52:45 2021 +0100

    PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)
    
    ServerCnx had a callback that was called from Producer/Consumer which
    would remove the producer/consumer from its map using only the
    ID. However, it is possible that this callback runs when the
    producer/consumer had already been removed from the map and another
    producer/consumer added in its place.
    
    The solution is to use both the key and value when removing from the
    map.
    
    The change also updates the log messages to include the producerId and
    consumerId in a format that all log messages for an individual
    producerId/consumerId can be easier found.
    
    A test has been changed because the test was depending on the broken
    behaviour. What was happening was that the fail topic producer was
    failing to create a producer, and when this happened it removed the
    producer future for the successful producer. Then, when the third
    producer tries to connect, it sees manages to create the producer on
    the connection, but fails as there is already a producer with that
    name on the topic. The correct behaviour is that it should see the
    successful producer future for that ID and respond with success.
    
    Co-authored-by: Ivan Kelly <ik...@splunk.com>
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 124 +++++++++++++--------
 .../pulsar/broker/service/ServerCnxTest.java       |   6 +-
 2 files changed, 79 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aea1f6b..8a4d5a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -901,8 +901,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         if (existingConsumerFuture != null) {
                             if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                                 Consumer consumer = existingConsumerFuture.getNow(null);
-                                log.info("[{}] Consumer with the same id {} is already created: {}", remoteAddress,
-                                        consumerId, consumer);
+                                log.info("[{}] Consumer with the same id is already created:"
+                                         + " consumerId={}, consumer={}",
+                                         remoteAddress, consumerId, consumer);
                                 commandSender.sendSuccessResponse(requestId);
                                 return null;
                             } else {
@@ -911,14 +912,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 // client timeout is lower the broker timeouts. We need to wait until the previous
                                 // consumer
                                 // creation request either complete or fails.
-                                log.warn("[{}][{}][{}] Consumer with id {} is already present on the connection",
-                                        remoteAddress, topicName, subscriptionName, consumerId);
+                                log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+                                         + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
                                 ServerError error = null;
                                 if (!existingConsumerFuture.isDone()) {
                                     error = ServerError.ServiceNotReady;
                                 } else {
                                     error = getErrorCode(existingConsumerFuture);
-                                    consumers.remove(consumerId);
+                                    consumers.remove(consumerId, existingConsumerFuture);
                                 }
                                 commandSender.sendErrorResponse(requestId, error,
                                         "Consumer is already present on the connection");
@@ -995,11 +996,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                                     exception.getCause().getMessage());
                                         }
                                     } else if (exception.getCause() instanceof BrokerServiceException) {
-                                        log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
-                                                subscriptionName, exception.getCause().getMessage());
+                                        log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
+                                                 remoteAddress, topicName, subscriptionName,
+                                                 consumerId,  exception.getCause().getMessage());
                                     } else {
-                                        log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
-                                                subscriptionName, exception.getCause().getMessage(), exception);
+                                        log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
+                                                 remoteAddress, topicName, subscriptionName,
+                                                 consumerId, exception.getCause().getMessage(), exception);
                                     }
 
                                     // If client timed out, the future would have been completed by subsequent close.
@@ -1095,10 +1098,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         if (existingProducerFuture != null) {
                             if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
                                 Producer producer = existingProducerFuture.getNow(null);
-                                log.info("[{}] Producer with the same id {} is already created: {}", remoteAddress,
-                                        producerId, producer);
+                                log.info("[{}] Producer with the same id is already created:"
+                                         + " producerId={}, producer={}", remoteAddress, producerId, producer);
                                 commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
                                         producer.getSchemaVersion());
+
                                 return null;
                             } else {
                                 // There was an early request to create a producer with
@@ -1114,12 +1118,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 } else {
                                     error = getErrorCode(existingProducerFuture);
                                     // remove producer with producerId as it's already completed with exception
-                                    producers.remove(producerId);
+                                    producers.remove(producerId, existingProducerFuture);
                                 }
-                                log.warn("[{}][{}] Producer with id {} is already present on the connection",
-                                        remoteAddress, producerId, topicName);
+                                log.warn("[{}][{}] Producer with id is already present on the connection,"
+                                         + " producerId={}", remoteAddress, topicName, producerId);
                                 commandSender.sendErrorResponse(requestId, error,
-                                        "Producer is already present on the connection");
+                                                                "Producer is already present on the connection");
+
                                 return null;
                             }
                         }
@@ -1201,8 +1206,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
                                         producers.remove(producerId, producerFuture);
                                 }).exceptionally(ex -> {
-                                    log.warn("[{}] Failed to add producer {}: {}",
-                                            remoteAddress, producer, ex.getMessage());
+                                    log.error("[{}] Failed to add producer to topic {}: producerId={}, {}",
+                                              remoteAddress, topicName, producerId, ex.getMessage());
+
                                     producer.closeNow(true);
                                     if (producerFuture.completeExceptionally(ex)) {
                                         commandSender.sendErrorResponse(requestId,
@@ -1231,7 +1237,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
                             if (!(cause instanceof ServiceUnitNotReadyException)) {
                                 // Do not print stack traces for expected exceptions
-                                log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
+                                log.error("[{}] Failed to create topic {}, producerId={}",
+                                          remoteAddress, topicName, producerId, exception);
                             }
 
                             // If client timed out, the future would have been completed
@@ -1474,7 +1481,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         CompletableFuture<Producer> producerFuture = producers.get(producerId);
         if (producerFuture == null) {
-            log.warn("[{}] Producer {} was not registered on the connection", remoteAddress, producerId);
+            log.warn("[{}] Producer was not registered on the connection. producerId={}", remoteAddress, producerId);
             commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
                     "Producer was not registered on the connection");
             return;
@@ -1484,12 +1491,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 .completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
             // We have received a request to close the producer before it was actually completed, we have marked the
             // producer future as failed and we can tell the client the close operation was successful.
-            log.info("[{}] Closed producer {} before its creation was completed", remoteAddress, producerId);
+            log.info("[{}] Closed producer before its creation was completed. producerId={}",
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
             return;
         } else if (producerFuture.isCompletedExceptionally()) {
-            log.info("[{}] Closed producer {} that already failed to be created", remoteAddress, producerId);
+            log.info("[{}] Closed producer that already failed to be created. producerId={}",
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
             return;
@@ -1497,11 +1506,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         // Proceed with normal close, the producer
         Producer producer = producerFuture.getNow(null);
-        log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), producer.getProducerName(), remoteAddress);
+        log.info("[{}][{}] Closing producer on cnx {}. producerId={}",
+                 producer.getTopic(), producer.getProducerName(), remoteAddress, producerId);
 
         producer.close(true).thenAccept(v -> {
-            log.info("[{}][{}] Closed producer on cnx {}", producer.getTopic(), producer.getProducerName(),
-                    remoteAddress);
+            log.info("[{}][{}] Closed producer on cnx {}. producerId={}",
+                     producer.getTopic(), producer.getProducerName(),
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
         });
@@ -1510,14 +1521,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     @Override
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         checkArgument(state == State.Connected);
-        log.info("[{}] Closing consumer: {}", remoteAddress, closeConsumer.getConsumerId());
+        log.info("[{}] Closing consumer: consumerId={}", remoteAddress, closeConsumer.getConsumerId());
 
         long requestId = closeConsumer.getRequestId();
         long consumerId = closeConsumer.getConsumerId();
 
         CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
         if (consumerFuture == null) {
-            log.warn("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress);
+            log.warn("[{}] Consumer was not registered on the connection: consumerId={}", remoteAddress, consumerId);
             commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
             return;
         }
@@ -1527,13 +1538,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             // We have received a request to close the consumer before it was actually completed, we have marked the
             // consumer future as failed and we can tell the client the close operation was successful. When the actual
             // create operation will complete, the new consumer will be discarded.
-            log.info("[{}] Closed consumer {} before its creation was completed", remoteAddress, consumerId);
+            log.info("[{}] Closed consumer before its creation was completed. consumerId={}",
+                     remoteAddress, consumerId);
             commandSender.sendSuccessResponse(requestId);
             return;
         }
 
         if (consumerFuture.isCompletedExceptionally()) {
-            log.info("[{}] Closed consumer {} that already failed to be created", remoteAddress, consumerId);
+            log.info("[{}] Closed consumer that already failed to be created. consumerId={}",
+                     remoteAddress, consumerId);
             commandSender.sendSuccessResponse(requestId);
             return;
         }
@@ -1544,7 +1557,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             consumer.close();
             consumers.remove(consumerId, consumerFuture);
             commandSender.sendSuccessResponse(requestId);
-            log.info("[{}] Closed consumer {}", remoteAddress, consumer);
+            log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
         } catch (BrokerServiceException e) {
             log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
             commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
@@ -1943,13 +1956,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     public void closeProducer(Producer producer) {
         // removes producer-connection from map and send close command to producer
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed producer: {}", remoteAddress, producer);
-        }
-        long producerId = producer.getProducerId();
-        producers.remove(producerId);
+        safelyRemoveProducer(producer);
         if (remoteEndpointProtocolVersion >= v5.getValue()) {
-            ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
+            ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
         } else {
             close();
         }
@@ -1959,13 +1968,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     @Override
     public void closeConsumer(Consumer consumer) {
         // removes consumer-connection from map and send close command to consumer
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
-        }
-        long consumerId = consumer.consumerId();
-        consumers.remove(consumerId);
+        safelyRemoveConsumer(consumer);
         if (remoteEndpointProtocolVersion >= v5.getValue()) {
-            ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
+            ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
         } else {
             close();
         }
@@ -1986,19 +1991,42 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     public void removedConsumer(Consumer consumer) {
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
-        }
-
-        consumers.remove(consumer.consumerId());
+        safelyRemoveConsumer(consumer);
     }
 
     @Override
     public void removedProducer(Producer producer) {
+        safelyRemoveProducer(producer);
+    }
+
+    private void safelyRemoveProducer(Producer producer) {
+        long producerId = producer.getProducerId();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed producer: {}", remoteAddress, producer);
+            log.debug("[{}] Removed producer: producerId={}, producer={}", remoteAddress, producerId, producer);
+        }
+        CompletableFuture<Producer> future = producers.get(producerId);
+        if (future != null) {
+            future.whenComplete((producer2, exception) -> {
+                    if (exception != null || producer2 == producer) {
+                        producers.remove(producerId, future);
+                    }
+                });
+        }
+    }
+
+    private void safelyRemoveConsumer(Consumer consumer) {
+        long consumerId = consumer.consumerId();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Removed consumer: consumerId={}, consumer={}", remoteAddress, consumerId, consumer);
+        }
+        CompletableFuture<Consumer> future = consumers.get(consumerId);
+        if (future != null) {
+            future.whenComplete((consumer2, exception) -> {
+                    if (exception != null || consumer2 == consumer) {
+                        consumers.remove(consumerId, future);
+                    }
+                });
         }
-        producers.remove(producer.getProducerId());
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 7ff0c14..4bdbacb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -954,10 +954,10 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap());
         channel.writeInbound(createProducer3);
 
-        // 3rd producer fails because 2nd is already connected
+        // 3rd producer succeeds because 2nd is already connected
         response = getResponse();
-        assertEquals(response.getClass(), CommandError.class);
-        assertEquals(((CommandError) response).getRequestId(), 4);
+        assertEquals(response.getClass(), CommandProducerSuccess.class);
+        assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);
 
         Thread.sleep(500);