You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/12/21 18:37:30 UTC

[pulsar] branch branch-2.9 updated: [improve][broker] Omit making a copy of CommandAck when there are no broker interceptors (#18997)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 948fe354678 [improve][broker] Omit making a copy of CommandAck when there are no broker interceptors (#18997)
948fe354678 is described below

commit 948fe3546786431e64e7a1002f2de40cac24224f
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Dec 20 20:47:52 2022 +0200

    [improve][broker] Omit making a copy of CommandAck when there are no broker interceptors (#18997)
    
    (cherry picked from commit 1154d0a8703bcf3fbc6e0c6f9df1f189ae09ef64)
    (cherry picked from commit b75f0682f91ea8e6c8b6712e4e50da779d466fc8)
    (cherry picked from commit cc781f7e79216f0c2d3152ca33c2920ec0ccff41)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  8 +++-
 .../pulsar/broker/intercept/BrokerInterceptor.java | 38 ------------------
 .../broker/intercept/BrokerInterceptors.java       |  2 +-
 .../org/apache/pulsar/broker/service/Producer.java |  7 +++-
 .../broker/service/PulsarCommandSenderImpl.java    | 10 +++--
 .../apache/pulsar/broker/service/ServerCnx.java    | 46 +++++++++++-----------
 .../pulsar/broker/web/PreInterceptFilter.java      |  4 +-
 .../broker/service/MessageCumulativeAckTest.java   | 14 +++----
 8 files changed, 50 insertions(+), 79 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 76ad1a9484e..67f3b55cbfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -698,8 +698,12 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             this.defaultOffloader = createManagedLedgerOffloader(
                     OffloadPoliciesImpl.create(this.getConfiguration().getProperties()));
             this.brokerInterceptor = BrokerInterceptors.load(config);
-            brokerService.setInterceptor(getBrokerInterceptor());
-            this.brokerInterceptor.initialize(this);
+            // use getter to support mocking getBrokerInterceptor method in tests
+            BrokerInterceptor interceptor = getBrokerInterceptor();
+            if (interceptor != null) {
+                brokerService.setInterceptor(interceptor);
+                interceptor.initialize(this);
+            }
             brokerService.start();
 
             // Load additional servlets
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index 1e440b883da..cac1e66b53f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -160,44 +160,6 @@ public interface BrokerInterceptor extends AutoCloseable {
      */
     void initialize(PulsarService pulsarService) throws Exception;
 
-    BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();
-
-    /**
-     * Broker interceptor disabled implementation.
-     */
-    class BrokerInterceptorDisabled implements BrokerInterceptor {
-
-        @Override
-        public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
-            // no-op
-        }
-
-        @Override
-        public void onConnectionClosed(ServerCnx cnx) {
-            // no-op
-        }
-
-        @Override
-        public void onWebserviceRequest(ServletRequest request) {
-            // no-op
-        }
-
-        @Override
-        public void onWebserviceResponse(ServletRequest request, ServletResponse response) {
-            // no-op
-        }
-
-        @Override
-        public void initialize(PulsarService pulsarService) throws Exception {
-            // no-op
-        }
-
-        @Override
-        public void close() {
-            // no-op
-        }
-    }
-
     /**
      * Close this broker interceptor.
      */
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index 878f2cb5320..28aca5603e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -89,7 +89,7 @@ public class BrokerInterceptors implements BrokerInterceptor {
         if (interceptors != null && !interceptors.isEmpty()) {
             return new BrokerInterceptors(interceptors);
         } else {
-            return DISABLED;
+            return null;
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 8631bd2bbe6..720b27036f8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.Topic.PublishContext;
@@ -67,6 +68,7 @@ public class Producer {
     private final boolean userProvidedProducerName;
     private final long producerId;
     private final String appId;
+    private final BrokerInterceptor brokerInterceptor;
     private Rate msgIn;
     private Rate chunkedMessageRate;
     // it records msg-drop rate only for non-persistent topic
@@ -138,6 +140,7 @@ public class Producer {
         this.topicEpoch = topicEpoch;
 
         this.clientAddress = cnx.clientSourceAddress();
+        this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
     }
 
     /**
@@ -469,8 +472,8 @@ public class Producer {
                 producer.chunkedMessageRate.recordEvent();
             }
             producer.publishOperationCompleted();
-            if (producer.cnx.getBrokerService().getInterceptor() != null){
-                producer.cnx.getBrokerService().getInterceptor().messageProduced(
+            if (producer.brokerInterceptor != null) {
+                producer.brokerInterceptor.messageProduced(
                         (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
             }
             recycle();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 03decb45435..ff7241c7ac3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -300,10 +300,12 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
     }
 
     private void safeIntercept(BaseCommand command, ServerCnx cnx) {
-        try {
-            this.interceptor.onPulsarCommand(command, cnx);
-        } catch (Exception e) {
-            log.error("Failed to execute command {} on broker interceptor.", command.getType(), e);
+        if (this.interceptor != null) {
+            try {
+                this.interceptor.onPulsarCommand(command, cnx);
+            } catch (Exception e) {
+                log.error("Failed to execute command {} on broker interceptor.", command.getType(), e);
+            }
         }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cfbbef1ea0b..4c8f074969c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
-import lombok.val;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -159,6 +158,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     private final String listenerName;
     private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
     private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
+    private final BrokerInterceptor brokerInterceptor;
     private State state;
     private volatile boolean isActive = true;
     String authRole = null;
@@ -267,6 +267,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 / conf.getNumIOThreads();
         this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
         this.connectionController = new ConnectionController.DefaultConnectionController(conf);
+        this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
     }
 
     @Override
@@ -283,7 +284,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         }
         log.info("New connection from {}", remoteAddress);
         this.ctx = ctx;
-        this.commandSender = new PulsarCommandSenderImpl(getBrokerService().getInterceptor(), this);
+        this.commandSender = new PulsarCommandSenderImpl(brokerInterceptor, this);
         this.service.getPulsarStats().recordConnectionCreate();
         cnxsPerThread.get().add(this);
     }
@@ -294,7 +295,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         connectionController.decreaseConnection(ctx.channel().remoteAddress());
         isActive = false;
         log.info("Closed connection from {}", remoteAddress);
-        BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
         if (brokerInterceptor != null) {
             brokerInterceptor.onConnectionClosed(this);
         }
@@ -634,7 +634,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) {
             this.clientVersion = clientVersion.intern();
         }
-        BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
         if (brokerInterceptor != null) {
             brokerInterceptor.onConnectionCreated(this);
         }
@@ -1066,8 +1065,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 log.info("[{}] Created subscription on topic {} / {}",
                                         remoteAddress, topicName, subscriptionName);
                                 commandSender.sendSuccessResponse(requestId);
-                                if (getBrokerService().getInterceptor() != null){
-                                    getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
+                                if (brokerInterceptor != null) {
+                                    brokerInterceptor.consumerCreated(this, consumer, metadata);
                                 }
                             } else {
                                 // The consumer future was completed before by a close command
@@ -1343,9 +1342,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     commandSender.sendProducerSuccessResponse(requestId, producerName,
                             producer.getLastSequenceId(), producer.getSchemaVersion(),
                             newTopicEpoch, true /* producer is ready now */);
-                    if (getBrokerService().getInterceptor() != null) {
-                        getBrokerService().getInterceptor().
-                            producerCreated(this, producer, metadata);
+                    if (brokerInterceptor != null) {
+                        brokerInterceptor.
+                                producerCreated(this, producer, metadata);
                     }
                     return;
                 } else {
@@ -1393,9 +1392,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 commandSender.sendProducerSuccessResponse(requestId, producerName,
                         producer.getLastSequenceId(), producer.getSchemaVersion(),
                         Optional.empty(), false/* producer is not ready now */);
-                if (getBrokerService().getInterceptor() != null) {
-                    getBrokerService().getInterceptor().
-                       producerCreated(this, producer, metadata);
+                if (brokerInterceptor != null) {
+                    brokerInterceptor.
+                            producerCreated(this, producer, metadata);
                 }
             }
         });
@@ -1475,7 +1474,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final boolean hasRequestId = ack.hasRequestId();
         final long requestId = hasRequestId ? ack.getRequestId() : 0;
         final long consumerId = ack.getConsumerId();
-        final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;
+        // It is necessary to make a copy of the CommandAck instance for the interceptor.
+        final CommandAck copyOfAckForInterceptor = brokerInterceptor != null ? new CommandAck().copyFrom(ack) : null;
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
@@ -1484,8 +1484,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             ctx.writeAndFlush(Commands.newAckResponse(
                                     requestId, null, null, consumerId));
                         }
-                        if (getBrokerService().getInterceptor() != null) {
-                            getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
+                        if (brokerInterceptor != null) {
+                            brokerInterceptor.messageAcked(this, consumer, copyOfAckForInterceptor);
                         }
                     }).exceptionally(e -> {
                         if (hasRequestId) {
@@ -2417,8 +2417,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void interceptCommand(BaseCommand command) throws InterceptException {
-        if (getBrokerService().getInterceptor() != null) {
-            getBrokerService().getInterceptor().onPulsarCommand(command, this);
+        if (brokerInterceptor != null) {
+            brokerInterceptor.onPulsarCommand(command, this);
         }
     }
 
@@ -2704,17 +2704,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 ackSet);
         ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
         try {
-            val brokerInterceptor = getBrokerService().getInterceptor();
             if (brokerInterceptor != null) {
                 brokerInterceptor.onPulsarCommand(command, this);
-
-                CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
-                if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
-                    Consumer consumer = consumerFuture.getNow(null);
+            }
+            CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
+            if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+                Consumer consumer = consumerFuture.getNow(null);
+                if (brokerInterceptor != null) {
                     brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
                 }
-            } else {
-                log.debug("BrokerInterceptor is not set in newMessageAndIntercept");
             }
         } catch (Exception e) {
             log.error("Exception occur when intercept messages.", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
index e4e7bbc67ca..388c740358a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
@@ -66,7 +66,9 @@ public class PreInterceptFilter implements Filter {
         }
         try {
             RequestWrapper requestWrapper = new RequestWrapper((HttpServletRequest) servletRequest);
-            interceptor.onWebserviceRequest(requestWrapper);
+            if (interceptor != null) {
+                interceptor.onWebserviceRequest(requestWrapper);
+            }
             filterChain.doFilter(requestWrapper, servletResponse);
         } catch (InterceptException e) {
             exceptionHandler.handle(servletResponse, e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 846cc1df58a..a9ca01bd8b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -97,6 +97,12 @@ public class MessageCumulativeAckTest {
             doReturn(pulsarResources).when(pulsar).getPulsarResources();
         });
 
+        eventLoopGroup = new NioEventLoopGroup();
+        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
+        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
+            doReturn(brokerService).when(pulsar).getBrokerService();
+        });
+
         serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
@@ -104,13 +110,7 @@ public class MessageCumulativeAckTest {
         when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
         when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
         doReturn(new PulsarCommandSenderImpl(null, serverCnx))
-            .when(serverCnx).getCommandSender();
-
-        eventLoopGroup = new NioEventLoopGroup();
-        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(brokerService).when(pulsar).getBrokerService();
-        });
+                .when(serverCnx).getCommandSender();
 
         String topicName = TopicName.get("MessageCumulativeAckTest").toString();
         PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService);