You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 11:12:53 UTC

[pulsar] branch branch-2.7 updated (2cb726d -> 139060b)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 2cb726d  Fix the interceptor that not handle boundary for multipart/form-data (#9247)
     new 094643b  Fix interceptor disabled in ResponseHandlerFilter.java (#9252)
     new 139060b  PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/service/ServerCnx.java    | 120 +++++++++++++--------
 .../pulsar/broker/web/ResponseHandlerFilter.java   |   3 +-
 .../broker/intercept/InterceptFilterOutTest.java   |  26 +++++
 .../pulsar/broker/service/ServerCnxTest.java       |   6 +-
 4 files changed, 105 insertions(+), 50 deletions(-)


[pulsar] 02/02: PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 139060b9820a0bbda5414d1b154b9a73f74546b8
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu Jan 21 22:52:45 2021 +0100

    PLSR-1456: Fix race condition on producer/consumer maps in ServerCnx (#9256)
    
    ServerCnx had a callback that was called from Producer/Consumer which
    would remove the producer/consumer from its map using only the
    ID. However, it is possible that this callback runs when the
    producer/consumer had already been removed from the map and another
    producer/consumer added in its place.
    
    The solution is to use both the key and value when removing from the
    map.
    
    The change also updates the log messages to include the producerId and
    consumerId in a format that all log messages for an individual
    producerId/consumerId can be easier found.
    
    A test has been changed because the test was depending on the broken
    behaviour. What was happening was that the fail topic producer was
    failing to create a producer, and when this happened it removed the
    producer future for the successful producer. Then, when the third
    producer tries to connect, it sees manages to create the producer on
    the connection, but fails as there is already a producer with that
    name on the topic. The correct behaviour is that it should see the
    successful producer future for that ID and respond with success.
    
    Co-authored-by: Ivan Kelly <ik...@splunk.com>
    (cherry picked from commit 1e4c3ec4f55ad0b0729f2849915f6b4d9e426bb1)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 120 +++++++++++++--------
 .../pulsar/broker/service/ServerCnxTest.java       |   6 +-
 2 files changed, 77 insertions(+), 49 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e931c94..bb169b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -874,8 +874,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         if (existingConsumerFuture != null) {
                             if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                                 Consumer consumer = existingConsumerFuture.getNow(null);
-                                log.info("[{}] Consumer with the same id {} is already created: {}", remoteAddress,
-                                        consumerId, consumer);
+                                log.info("[{}] Consumer with the same id is already created:"
+                                         + " consumerId={}, consumer={}",
+                                         remoteAddress, consumerId, consumer);
                                 commandSender.sendSuccessResponse(requestId);
                                 return null;
                             } else {
@@ -884,14 +885,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 // client timeout is lower the broker timeouts. We need to wait until the previous
                                 // consumer
                                 // creation request either complete or fails.
-                                log.warn("[{}][{}][{}] Consumer with id {} is already present on the connection", remoteAddress,
-                                        topicName, subscriptionName, consumerId);
+                                log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+                                         + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
                                 ServerError error = null;
                                 if(!existingConsumerFuture.isDone()) {
                                     error = ServerError.ServiceNotReady;
                                 }else {
                                     error = getErrorCode(existingConsumerFuture);
-                                    consumers.remove(consumerId);
+                                    consumers.remove(consumerId, existingConsumerFuture);
                                 }
                                 commandSender.sendErrorResponse(requestId, error,
                                         "Consumer is already present on the connection");
@@ -962,11 +963,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                                     exception.getCause().getMessage());
                                         }
                                     } else if (exception.getCause() instanceof BrokerServiceException) {
-                                        log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
-                                                subscriptionName, exception.getCause().getMessage());
+                                        log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
+                                                 remoteAddress, topicName, subscriptionName,
+                                                 consumerId,  exception.getCause().getMessage());
                                     } else {
-                                        log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
-                                                subscriptionName, exception.getCause().getMessage(), exception);
+                                        log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
+                                                 remoteAddress, topicName, subscriptionName,
+                                                 consumerId, exception.getCause().getMessage(), exception);
                                     }
 
                                     // If client timed out, the future would have been completed by subsequent close.
@@ -1056,10 +1059,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         if (existingProducerFuture != null) {
                             if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
                                 Producer producer = existingProducerFuture.getNow(null);
-                                log.info("[{}] Producer with the same id {} is already created: {}", remoteAddress,
-                                        producerId, producer);
+                                log.info("[{}] Producer with the same id is already created:"
+                                         + " producerId={}, producer={}", remoteAddress, producerId, producer);
                                 commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
                                         producer.getSchemaVersion());
+
                                 return null;
                             } else {
                                 // There was an early request to create a producer with
@@ -1075,12 +1079,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 } else {
                                     error = getErrorCode(existingProducerFuture);
                                     // remove producer with producerId as it's already completed with exception
-                                    producers.remove(producerId);
+                                    producers.remove(producerId, existingProducerFuture);
                                 }
-                                log.warn("[{}][{}] Producer with id {} is already present on the connection", remoteAddress,
-                                        producerId, topicName);
+                                log.warn("[{}][{}] Producer with id is already present on the connection,"
+                                         + " producerId={}", remoteAddress, topicName, producerId);
                                 commandSender.sendErrorResponse(requestId, error,
-                                        "Producer is already present on the connection");
+                                                                "Producer is already present on the connection");
+
                                 return null;
                             }
                         }
@@ -1154,6 +1159,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                             remoteAddress, producer);
                                         producerFuture.completeExceptionally(
                                             new IllegalStateException("Producer created after connection was closed"));
+
                                     }
                                 } catch (Exception ise) {
                                     log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
@@ -1174,7 +1180,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
                             if (!(cause instanceof ServiceUnitNotReadyException)) {
                                 // Do not print stack traces for expected exceptions
-                                log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
+                                log.error("[{}] Failed to create topic {}, producerId={}",
+                                          remoteAddress, topicName, producerId, exception);
                             }
 
                             // If client timed out, the future would have been completed
@@ -1408,7 +1415,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         CompletableFuture<Producer> producerFuture = producers.get(producerId);
         if (producerFuture == null) {
-            log.warn("[{}] Producer {} was not registered on the connection", remoteAddress, producerId);
+            log.warn("[{}] Producer was not registered on the connection. producerId={}", remoteAddress, producerId);
             commandSender.sendErrorResponse(requestId, ServerError.UnknownError,
                     "Producer was not registered on the connection");
             return;
@@ -1418,12 +1425,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 .completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
             // We have received a request to close the producer before it was actually completed, we have marked the
             // producer future as failed and we can tell the client the close operation was successful.
-            log.info("[{}] Closed producer {} before its creation was completed", remoteAddress, producerId);
+            log.info("[{}] Closed producer before its creation was completed. producerId={}",
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
             return;
         } else if (producerFuture.isCompletedExceptionally()) {
-            log.info("[{}] Closed producer {} that already failed to be created", remoteAddress, producerId);
+            log.info("[{}] Closed producer that already failed to be created. producerId={}",
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
             return;
@@ -1431,11 +1440,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         // Proceed with normal close, the producer
         Producer producer = producerFuture.getNow(null);
-        log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), producer.getProducerName(), remoteAddress);
+        log.info("[{}][{}] Closing producer on cnx {}. producerId={}",
+                 producer.getTopic(), producer.getProducerName(), remoteAddress, producerId);
 
         producer.close(true).thenAccept(v -> {
-            log.info("[{}][{}] Closed producer on cnx {}", producer.getTopic(), producer.getProducerName(),
-                    remoteAddress);
+            log.info("[{}][{}] Closed producer on cnx {}. producerId={}",
+                     producer.getTopic(), producer.getProducerName(),
+                     remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
         });
@@ -1444,14 +1455,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     @Override
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         checkArgument(state == State.Connected);
-        log.info("[{}] Closing consumer: {}", remoteAddress, closeConsumer.getConsumerId());
+        log.info("[{}] Closing consumer: consumerId={}", remoteAddress, closeConsumer.getConsumerId());
 
         long requestId = closeConsumer.getRequestId();
         long consumerId = closeConsumer.getConsumerId();
 
         CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
         if (consumerFuture == null) {
-            log.warn("[{}] Consumer was not registered on the connection: {}", consumerId, remoteAddress);
+            log.warn("[{}] Consumer was not registered on the connection: consumerId={}", remoteAddress, consumerId);
             commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
             return;
         }
@@ -1461,13 +1472,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             // We have received a request to close the consumer before it was actually completed, we have marked the
             // consumer future as failed and we can tell the client the close operation was successful. When the actual
             // create operation will complete, the new consumer will be discarded.
-            log.info("[{}] Closed consumer {} before its creation was completed", remoteAddress, consumerId);
+            log.info("[{}] Closed consumer before its creation was completed. consumerId={}",
+                     remoteAddress, consumerId);
             commandSender.sendSuccessResponse(requestId);
             return;
         }
 
         if (consumerFuture.isCompletedExceptionally()) {
-            log.info("[{}] Closed consumer {} that already failed to be created", remoteAddress, consumerId);
+            log.info("[{}] Closed consumer that already failed to be created. consumerId={}",
+                     remoteAddress, consumerId);
             commandSender.sendSuccessResponse(requestId);
             return;
         }
@@ -1478,7 +1491,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             consumer.close();
             consumers.remove(consumerId, consumerFuture);
             commandSender.sendSuccessResponse(requestId);
-            log.info("[{}] Closed consumer {}", remoteAddress, consumer);
+            log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
         } catch (BrokerServiceException e) {
             log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
             commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
@@ -1879,13 +1892,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     public void closeProducer(Producer producer) {
         // removes producer-connection from map and send close command to producer
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed producer: {}", remoteAddress, producer);
-        }
-        long producerId = producer.getProducerId();
-        producers.remove(producerId);
+        safelyRemoveProducer(producer);
         if (remoteEndpointProtocolVersion >= v5.getNumber()) {
-            ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
+            ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
         } else {
             close();
         }
@@ -1895,13 +1904,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     @Override
     public void closeConsumer(Consumer consumer) {
         // removes consumer-connection from map and send close command to consumer
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
-        }
-        long consumerId = consumer.consumerId();
-        consumers.remove(consumerId);
+        safelyRemoveConsumer(consumer);
         if (remoteEndpointProtocolVersion >= v5.getNumber()) {
-            ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
+            ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
         } else {
             close();
         }
@@ -1922,19 +1927,42 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     public void removedConsumer(Consumer consumer) {
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed consumer: {}", remoteAddress, consumer);
-        }
-
-        consumers.remove(consumer.consumerId());
+        safelyRemoveConsumer(consumer);
     }
 
     @Override
     public void removedProducer(Producer producer) {
+        safelyRemoveProducer(producer);
+    }
+
+    private void safelyRemoveProducer(Producer producer) {
+        long producerId = producer.getProducerId();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Removed producer: {}", remoteAddress, producer);
+            log.debug("[{}] Removed producer: producerId={}, producer={}", remoteAddress, producerId, producer);
+        }
+        CompletableFuture<Producer> future = producers.get(producerId);
+        if (future != null) {
+            future.whenComplete((producer2, exception) -> {
+                    if (exception != null || producer2 == producer) {
+                        producers.remove(producerId, future);
+                    }
+                });
+        }
+    }
+
+    private void safelyRemoveConsumer(Consumer consumer) {
+        long consumerId = consumer.consumerId();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Removed consumer: consumerId={}, consumer={}", remoteAddress, consumerId, consumer);
+        }
+        CompletableFuture<Consumer> future = consumers.get(consumerId);
+        if (future != null) {
+            future.whenComplete((consumer2, exception) -> {
+                    if (exception != null || consumer2 == consumer) {
+                        consumers.remove(consumerId, future);
+                    }
+                });
         }
-        producers.remove(producer.getProducerId());
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index cc7216b..ba78054 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -953,10 +953,10 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap());
         channel.writeInbound(createProducer3);
 
-        // 3rd producer fails because 2nd is already connected
+        // 3rd producer succeeds because 2nd is already connected
         response = getResponse();
-        assertEquals(response.getClass(), CommandError.class);
-        assertEquals(((CommandError) response).getRequestId(), 4);
+        assertEquals(response.getClass(), CommandProducerSuccess.class);
+        assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);
 
         Thread.sleep(500);
 


[pulsar] 01/02: Fix interceptor disabled in ResponseHandlerFilter.java (#9252)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 094643b652561b1347339e2cca90bc0d47ac2dc3
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jan 21 15:16:06 2021 +0800

    Fix interceptor disabled in ResponseHandlerFilter.java (#9252)
    
    ### Motivation
    
    Fix interceptor disabled in ResponseHandlerFilter.java
    
    ### Verifying this change
    
    New unit test added.
    
    (cherry picked from commit de7b59d784f5e54b0031f524b08f2f03ca88e2ca)
---
 .../pulsar/broker/web/ResponseHandlerFilter.java   |  3 ++-
 .../broker/intercept/InterceptFilterOutTest.java   | 26 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
index d7d16f9..9091d05 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ResponseHandlerFilter.java
@@ -66,7 +66,8 @@ public class ResponseHandlerFilter implements Filter {
                 /* connection is already invalidated */
             }
         }
-        if (!StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA)
+        if (interceptorEnabled
+                && !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.MULTIPART_FORM_DATA)
                 && !StringUtils.containsIgnoreCase(request.getContentType(), MediaType.APPLICATION_OCTET_STREAM)) {
             interceptor.onWebserviceResponse(request, response);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
index 53c5463..fd13409 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -110,6 +110,32 @@ public class InterceptFilterOutTest {
         }
     }
 
+    @Test
+    public void testShouldNotInterceptWhenInterceptorDisabled() throws Exception {
+        CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
+        PulsarService pulsarService = Mockito.mock(PulsarService.class);
+        Mockito.doReturn("pulsar://127.0.0.1:6650").when(pulsarService).getAdvertisedAddress();
+        Mockito.doReturn(interceptor).when(pulsarService).getBrokerInterceptor();
+        ServiceConfiguration conf = Mockito.mock(ServiceConfiguration.class);
+        // Disable the broker interceptor
+        Mockito.doReturn(Sets.newHashSet()).when(conf).getBrokerInterceptors();
+        Mockito.doReturn(conf).when(pulsarService).getConfig();
+        ResponseHandlerFilter filter = new ResponseHandlerFilter(pulsarService);
+
+        HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        FilterChain chain = Mockito.mock(FilterChain.class);
+        Mockito.doNothing().when(chain).doFilter(Mockito.any(), Mockito.any());
+        HttpServletRequestWrapper mockInputStream = new MockRequestWrapper(request);
+        Mockito.doReturn(mockInputStream.getInputStream()).when(request).getInputStream();
+        Mockito.doReturn(new StringBuffer("http://127.0.0.1:8080")).when(request).getRequestURL();
+
+        // Should not be intercepted since the broker interceptor disabled.
+        Mockito.doReturn("application/json").when(request).getContentType();
+        filter.doFilter(request, response, chain);
+        Assert.assertEquals(interceptor.getCount(), 0);
+    }
+
     private static class MockRequestWrapper extends HttpServletRequestWrapper {
 
         public MockRequestWrapper(HttpServletRequest request) {