You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/15 13:53:09 UTC

[pulsar] branch master updated: [feature][broker] PIP-204: Extensions for broker interceptor (#17269)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c7b714603a8 [feature][broker] PIP-204: Extensions for broker interceptor (#17269)
c7b714603a8 is described below

commit c7b714603a8a9a8bd401e6939dc811f763d65f72
Author: AloysZhang <lo...@gmail.com>
AuthorDate: Thu Sep 15 21:52:58 2022 +0800

    [feature][broker] PIP-204: Extensions for broker interceptor (#17269)
---
 .../pulsar/broker/intercept/BrokerInterceptor.java |  54 +++++++++
 .../BrokerInterceptorWithClassLoader.java          |  39 ++++++
 .../broker/intercept/BrokerInterceptors.java       | 134 +++++++++++++--------
 .../broker/service/AbstractBaseDispatcher.java     |   2 +
 .../org/apache/pulsar/broker/service/Producer.java |  49 +++++---
 .../apache/pulsar/broker/service/ServerCnx.java    |  52 +++-----
 .../org/apache/pulsar/broker/service/Topic.java    |  12 ++
 .../broker/intercept/BrokerInterceptorTest.java    |  45 ++++++-
 .../BrokerInterceptorWithClassLoaderTest.java      |   8 ++
 .../broker/intercept/CounterBrokerInterceptor.java |  52 ++++++++
 10 files changed, 350 insertions(+), 97 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index 0c56c29b621..6ba72370d1e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -52,18 +52,36 @@ public interface BrokerInterceptor extends AutoCloseable {
 
     /**
      * Intercept messages before sending them to the consumers.
+     * Deprecated, use {@link #beforeSendMessage(Subscription, Entry, long[], MessageMetadata, Consumer)} instead.
      *
      * @param subscription pulsar subscription
      * @param entry entry
      * @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
      * @param msgMetadata message metadata. The message metadata will be recycled after this call.
      */
+    @Deprecated
     default void beforeSendMessage(Subscription subscription,
                                    Entry entry,
                                    long[] ackSet,
                                    MessageMetadata msgMetadata) {
     }
 
+    /**
+     * Intercept messages before sending them to the consumers.
+     *
+     * @param subscription pulsar subscription
+     * @param entry entry
+     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
+     * @param msgMetadata message metadata. The message metadata will be recycled after this call.
+     * @param consumer consumer. Consumer which entry are sent to.
+     */
+    default void beforeSendMessage(Subscription subscription,
+                                   Entry entry,
+                                   long[] ackSet,
+                                   MessageMetadata msgMetadata,
+                                   Consumer consumer) {
+    }
+
     /**
      * Called by the broker when a new connection is created.
      */
@@ -77,6 +95,18 @@ public interface BrokerInterceptor extends AutoCloseable {
                                  Map<String, String> metadata){
     }
 
+    /**
+     * Called by the broker when a producer is closed.
+     *
+     * @param cnx      client Connection
+     * @param producer Producer object
+     * @param metadata A map of metadata
+     */
+    default void producerClosed(ServerCnx cnx,
+                                Producer producer,
+                                Map<String, String> metadata) {
+    }
+
     /**
      * Intercept after a consumer is created.
      *
@@ -89,6 +119,30 @@ public interface BrokerInterceptor extends AutoCloseable {
                                  Map<String, String> metadata) {
     }
 
+    /**
+     *  Called by the broker when a consumer is closed.
+     *
+     * @param cnx client Connection
+     * @param consumer Consumer object
+     * @param metadata A map of metadata
+     */
+    default void consumerClosed(ServerCnx cnx,
+                                Consumer consumer,
+                                Map<String, String> metadata) {
+    }
+
+    /**
+     * Intercept message when broker receive a send request.
+     *
+     * @param headersAndPayload entry's header and payload
+     * @param publishContext Publish Context
+     */
+    default void onMessagePublish(Producer producer,
+                                  ByteBuf headersAndPayload,
+                                  Topic.PublishContext publishContext) {
+
+    }
+
     /**
      * Intercept after a message is produced.
      *
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index f04446fa9a0..f309aff9a61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -63,6 +63,26 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
         }
     }
 
+    @Override
+    public void beforeSendMessage(Subscription subscription,
+                                  Entry entry,
+                                  long[] ackSet,
+                                  MessageMetadata msgMetadata,
+                                  Consumer consumer) {
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.beforeSendMessage(
+                    subscription, entry, ackSet, msgMetadata, consumer);
+        }
+    }
+
+    @Override
+    public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
+                                 Topic.PublishContext publishContext) {
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext);
+        }
+    }
+
     @Override
     public void producerCreated(ServerCnx cnx, Producer producer,
                                 Map<String, String> metadata){
@@ -71,6 +91,15 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
         }
     }
 
+    @Override
+    public void producerClosed(ServerCnx cnx,
+                               Producer producer,
+                               Map<String, String> metadata) {
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.producerClosed(cnx, producer, metadata);
+        }
+    }
+
     @Override
     public void consumerCreated(ServerCnx cnx,
                                 Consumer consumer,
@@ -81,6 +110,16 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
         }
     }
 
+    @Override
+    public void consumerClosed(ServerCnx cnx,
+                               Consumer consumer,
+                               Map<String, String> metadata) {
+        try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
+            this.interceptor.consumerClosed(cnx, consumer, metadata);
+        }
+    }
+
+
     @Override
     public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
                                 long entryId, Topic.PublishContext publishContext) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index 225066b9434..ade496c192f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -93,17 +93,39 @@ public class BrokerInterceptors implements BrokerInterceptor {
         }
     }
 
+    @Override
+    public void onMessagePublish(Producer producer,
+                                 ByteBuf headersAndPayload,
+                                 Topic.PublishContext publishContext) {
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.onMessagePublish(producer, headersAndPayload, publishContext);
+            }
+        }
+    }
+
     @Override
     public void beforeSendMessage(Subscription subscription,
                                   Entry entry,
                                   long[] ackSet,
                                   MessageMetadata msgMetadata) {
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.beforeSendMessage(
-                subscription,
-                entry,
-                ackSet,
-                msgMetadata);
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
+            }
+        }
+    }
+
+    @Override
+    public void beforeSendMessage(Subscription subscription,
+                                  Entry entry,
+                                  long[] ackSet,
+                                  MessageMetadata msgMetadata,
+                                  Consumer consumer) {
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
+            }
         }
     }
 
@@ -111,89 +133,103 @@ public class BrokerInterceptors implements BrokerInterceptor {
     public void consumerCreated(ServerCnx cnx,
                                  Consumer consumer,
                                  Map<String, String> metadata) {
-        if (interceptors == null || interceptors.isEmpty()) {
-            return;
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.consumerCreated(
+                        cnx,
+                        consumer,
+                        metadata);
+            }
         }
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.consumerCreated(
-                    cnx,
-                    consumer,
-                    metadata);
+    }
+
+    @Override
+    public void consumerClosed(ServerCnx cnx,
+                               Consumer consumer,
+                               Map<String, String> metadata) {
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.consumerClosed(cnx, consumer, metadata);
+            }
         }
     }
 
     @Override
     public void producerCreated(ServerCnx cnx, Producer producer,
                                  Map<String, String> metadata){
-        if (interceptors == null || interceptors.isEmpty()) {
-            return;
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.producerCreated(cnx, producer, metadata);
+            }
         }
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.producerCreated(cnx, producer, metadata);
+    }
+
+    @Override
+    public void producerClosed(ServerCnx cnx,
+                               Producer producer,
+                               Map<String, String> metadata) {
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.producerClosed(cnx, producer, metadata);
+            }
         }
     }
 
     @Override
     public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
                                  long entryId, Topic.PublishContext publishContext) {
-        if (interceptors == null || interceptors.isEmpty()) {
-            return;
-        }
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
+            }
         }
     }
 
     @Override
     public  void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
                                    long entryId, ByteBuf headersAndPayload) {
-        if (interceptors == null || interceptors.isEmpty()) {
-            return;
-        }
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
+            }
         }
     }
 
     @Override
     public void messageAcked(ServerCnx cnx, Consumer consumer,
                               CommandAck ackCmd) {
-        if (interceptors == null || interceptors.isEmpty()) {
-            return;
-        }
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.messageAcked(cnx, consumer, ackCmd);
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.messageAcked(cnx, consumer, ackCmd);
+            }
         }
     }
 
     @Override
     public void txnOpened(long tcId, String txnID) {
-        if (interceptors == null || interceptors.isEmpty()) {
-            return;
-        }
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.txnOpened(tcId, txnID);
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.txnOpened(tcId, txnID);
+            }
         }
     }
 
     @Override
     public void txnEnded(String txnID, long txnAction) {
-        if (interceptors == null || interceptors.isEmpty()) {
-            return;
-        }
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.txnEnded(txnID, txnAction);
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.txnEnded(txnID, txnAction);
+            }
         }
     }
 
 
     @Override
     public void onConnectionCreated(ServerCnx cnx) {
-        if (interceptors == null || interceptors.isEmpty()) {
-            return;
-        }
-        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
-            value.onConnectionCreated(cnx);
+        if (interceptorsEnabled()) {
+            for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+                value.onConnectionCreated(cnx);
+            }
         }
     }
 
@@ -237,4 +273,8 @@ public class BrokerInterceptors implements BrokerInterceptor {
     public void close() {
         interceptors.values().forEach(BrokerInterceptorWithClassLoader::close);
     }
+
+    private boolean interceptorsEnabled() {
+        return interceptors != null && !interceptors.isEmpty();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 9843c48f46c..3e3fba07eac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -197,7 +197,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
             BrokerInterceptor interceptor = subscription.interceptor();
             if (null != interceptor) {
+                // keep for compatibility if users has implemented the old interface
                 interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
+                interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
             }
         }
         if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index f8591a8447a..b5d87a46cfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -246,18 +246,22 @@ public class Producer {
 
     private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
                                        boolean isMarker) {
-        topic.publishMessage(headersAndPayload,
-                MessagePublishContext.get(this, sequenceId, msgIn,
-                        headersAndPayload.readableBytes(), batchSize,
-                        isChunked, System.nanoTime(), isMarker));
+        MessagePublishContext messagePublishContext =
+                MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
+                        batchSize, isChunked, System.nanoTime(), isMarker);
+        this.cnx.getBrokerService().getInterceptor()
+                .onMessagePublish(this, headersAndPayload, messagePublishContext);
+        topic.publishMessage(headersAndPayload, messagePublishContext);
     }
 
     private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
                                        long batchSize, boolean isChunked, boolean isMarker) {
-        topic.publishMessage(headersAndPayload,
-                MessagePublishContext.get(this, lowestSequenceId,
-                        highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
-                        isChunked, System.nanoTime(), isMarker));
+        MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
+                highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
+                isChunked, System.nanoTime(), isMarker);
+        this.cnx.getBrokerService().getInterceptor()
+                .onMessagePublish(this, headersAndPayload, messagePublishContext);
+        topic.publishMessage(headersAndPayload, messagePublishContext);
     }
 
     private boolean verifyChecksum(ByteBuf headersAndPayload) {
@@ -354,6 +358,8 @@ public class Producer {
         private long highestSequenceId;
         private long originalHighestSequenceId;
 
+        private long entryTimestamp;
+
         public String getProducerName() {
             return producer.getProducerName();
         }
@@ -367,6 +373,15 @@ public class Producer {
             return chunked;
         }
 
+        @Override
+        public long getEntryTimestamp() {
+            return entryTimestamp;
+        }
+
+        @Override
+        public void setEntryTimestamp(long entryTimestamp) {
+            this.entryTimestamp = entryTimestamp;
+        }
         @Override
         public void setProperty(String propertyName, Object value){
             if (this.propertyMap == null) {
@@ -483,10 +498,8 @@ public class Producer {
                 producer.chunkedMessageRate.recordEvent();
             }
             producer.publishOperationCompleted();
-            if (producer.cnx.getBrokerService().getInterceptor() != null){
-                producer.cnx.getBrokerService().getInterceptor().messageProduced(
-                        (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
-            }
+            producer.cnx.getBrokerService().getInterceptor().messageProduced(
+                    (ServerCnx) producer.cnx, producer, startTimeNs, ledgerId, entryId, this);
             recycle();
         }
 
@@ -534,6 +547,11 @@ public class Producer {
             return batchSize;
         }
 
+        @Override
+        public long getMsgSize() {
+            return  msgSize;
+        }
+
         @Override
         public boolean isMarkerMessage() {
             return isMarker;
@@ -730,9 +748,12 @@ public class Producer {
     public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
                                   ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
         checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize);
-        topic.publishTxnMessage(txnID, headersAndPayload,
+        MessagePublishContext messagePublishContext =
                 MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
-                        headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker));
+                        headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker);
+        this.cnx.getBrokerService().getInterceptor()
+                .onMessagePublish(this, headersAndPayload, messagePublishContext);
+        topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext);
     }
 
     public SchemaVersion getSchemaVersion() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4fe703de878..f0db48e8869 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -310,9 +310,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         isActive = false;
         log.info("Closed connection from {}", remoteAddress);
         BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
-        if (brokerInterceptor != null) {
-            brokerInterceptor.onConnectionClosed(this);
-        }
+        brokerInterceptor.onConnectionClosed(this);
 
         cnxsPerThread.get().remove(this);
 
@@ -326,6 +324,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                 Producer producer = producerFuture.getNow(null);
                 producer.closeNow(true);
+                brokerInterceptor.producerClosed(this, producer, producer.getMetadata());
             }
         });
 
@@ -339,6 +338,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 Consumer consumer = consumerFuture.getNow(null);
                 try {
                     consumer.close();
+                    brokerInterceptor.consumerClosed(this, consumer, consumer.getMetadata());
                 } catch (BrokerServiceException e) {
                     log.warn("Consumer {} was already closed: {}", consumer, e);
                 }
@@ -672,10 +672,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         if (isNotBlank(clientVersion) && !clientVersion.contains(" ") /* ignore default version: pulsar client */) {
             this.clientVersion = clientVersion.intern();
         }
-        BrokerInterceptor brokerInterceptor = getBrokerService().getInterceptor();
-        if (brokerInterceptor != null) {
-            brokerInterceptor.onConnectionCreated(this);
-        }
+        getBrokerService().getInterceptor().onConnectionCreated(this);
     }
 
     // According to auth result, send newConnected or newAuthChallenge command.
@@ -1137,9 +1134,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 log.info("[{}] Created subscription on topic {} / {}",
                                         remoteAddress, topicName, subscriptionName);
                                 commandSender.sendSuccessResponse(requestId);
-                                if (getBrokerService().getInterceptor() != null){
-                                    getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
-                                }
+                                getBrokerService().getInterceptor().consumerCreated(this, consumer, metadata);
                             } else {
                                 // The consumer future was completed before by a close command
                                 try {
@@ -1480,10 +1475,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     commandSender.sendProducerSuccessResponse(requestId, producerName,
                             producer.getLastSequenceId(), producer.getSchemaVersion(),
                             newTopicEpoch, true /* producer is ready now */);
-                    if (getBrokerService().getInterceptor() != null) {
-                        getBrokerService().getInterceptor().
+                    getBrokerService().getInterceptor().
                             producerCreated(this, producer, metadata);
-                    }
                     return;
                 } else {
                     // The producer's future was completed before by
@@ -1530,10 +1523,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 commandSender.sendProducerSuccessResponse(requestId, producerName,
                         producer.getLastSequenceId(), producer.getSchemaVersion(),
                         Optional.empty(), false/* producer is not ready now */);
-                if (getBrokerService().getInterceptor() != null) {
-                    getBrokerService().getInterceptor().
-                       producerCreated(this, producer, metadata);
-                }
+                getBrokerService().getInterceptor().
+                        producerCreated(this, producer, metadata);
             }
         });
     }
@@ -1620,10 +1611,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             ctx.writeAndFlush(Commands.newAckResponse(
                                     requestId, null, null, consumerId));
                         }
-                        if (getBrokerService().getInterceptor() != null) {
-                            getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
-                        }
-                    }).exceptionally(e -> {
+                getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
+            }).exceptionally(e -> {
                         if (hasRequestId) {
                             ctx.writeAndFlush(Commands.newAckResponse(requestId,
                                     BrokerServiceException.getClientErrorCode(e),
@@ -1816,6 +1805,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                      remoteAddress, producerId);
             commandSender.sendSuccessResponse(requestId);
             producers.remove(producerId, producerFuture);
+            getBrokerService().getInterceptor().producerClosed(this, producer, producer.getMetadata());
         });
     }
 
@@ -1859,6 +1849,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             consumers.remove(consumerId, consumerFuture);
             commandSender.sendSuccessResponse(requestId);
             log.info("[{}] Closed consumer, consumerId={}", remoteAddress, consumerId);
+            getBrokerService().getInterceptor().consumerClosed(this, consumer, consumer.getMetadata());
         } catch (BrokerServiceException e) {
             log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e);
             commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
@@ -2655,9 +2646,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void interceptCommand(BaseCommand command) throws InterceptException {
-        if (getBrokerService().getInterceptor() != null) {
-            getBrokerService().getInterceptor().onPulsarCommand(command, this);
-        }
+        getBrokerService().getInterceptor().onPulsarCommand(command, this);
     }
 
     public void closeProducer(Producer producer) {
@@ -2942,16 +2931,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
         try {
             val brokerInterceptor = getBrokerService().getInterceptor();
-            if (brokerInterceptor != null) {
-                brokerInterceptor.onPulsarCommand(command, this);
-
-                CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
-                if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
-                    Consumer consumer = consumerFuture.getNow(null);
-                    brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
-                }
-            } else {
-                log.debug("BrokerInterceptor is not set in newMessageAndIntercept");
+            brokerInterceptor.onPulsarCommand(command, this);
+            CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
+            if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+                Consumer consumer = consumerFuture.getNow(null);
+                brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
             }
         } catch (Exception e) {
             log.error("Exception occur when intercept messages.", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index b4f27adcc4a..312468933f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -101,6 +101,10 @@ public interface Topic {
             return  1L;
         }
 
+        default long getMsgSize() {
+            return  -1L;
+        }
+
         default boolean isMarkerMessage() {
             return false;
         }
@@ -115,6 +119,14 @@ public interface Topic {
         default boolean isChunked() {
             return false;
         }
+
+        default long getEntryTimestamp() {
+            return -1L;
+        }
+
+        default void setEntryTimestamp(long entryTimestamp) {
+
+        }
     }
 
     CompletableFuture<Void> initialize();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index b2c3b8d711f..290cf19c8f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -146,6 +146,17 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 1);
     }
 
+    @Test
+    public void testProducerClose() throws PulsarClientException {
+        BrokerInterceptor listener = pulsar.getBrokerInterceptor();
+        Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
+        assertEquals(((CounterBrokerInterceptor) listener).getProducerCount(), 0);
+        Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL).topic("test").create();
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 1);
+        producer.close();
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getProducerCount() == 0);
+    }
+
     @Test
     public void testConsumerCreation() throws PulsarClientException {
         BrokerInterceptor listener = pulsar.getBrokerInterceptor();
@@ -155,6 +166,35 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1);
     }
 
+    @Test
+    public void testConsumerClose() throws PulsarClientException {
+        BrokerInterceptor listener = pulsar.getBrokerInterceptor();
+        Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
+        assertEquals(((CounterBrokerInterceptor) listener).getConsumerCount(), 0);
+        Consumer<String> consumer = pulsarClient
+                .newConsumer(Schema.STRING).topic("test1").subscriptionName("test-sub").subscribe();
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 1);
+        consumer.close();
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getConsumerCount() == 0);
+    }
+
+    @Test
+    public void testMessagePublishAndProduced() throws PulsarClientException {
+        BrokerInterceptor listener = pulsar.getBrokerInterceptor();
+        Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("test-before-send-message")
+                .create();
+
+        assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0);
+        assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0);
+        producer.send("hello world");
+        assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1);
+        assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1);
+    }
+
     @Test
     public void testBeforeSendMessage() throws PulsarClientException {
         BrokerInterceptor listener = pulsar.getBrokerInterceptor();
@@ -170,16 +210,17 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
             .subscriptionName("test")
             .subscribe();
 
-        assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),0);
+        assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),0);
         assertEquals(((CounterBrokerInterceptor)listener).getMessageDispatchCount(),0);
         producer.send("hello world");
-        assertEquals(((CounterBrokerInterceptor)listener).getMessagePublishCount(),1);
+        assertEquals(((CounterBrokerInterceptor)listener).getMessageProducedCount(),1);
 
         Message<String> msg = consumer.receive();
 
         assertEquals(msg.getValue(), "hello world");
 
         Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getBeforeSendCountAtConsumerLevel() == 1);
         Awaitility.await().until(() -> ((CounterBrokerInterceptor) listener).getMessageDispatchCount() == 1);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
index 4f11792dbc2..758f5655453 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
@@ -69,6 +69,12 @@ public class BrokerInterceptorWithClassLoaderTest {
             public void beforeSendMessage(Subscription subscription, Entry entry, long[] ackSet, MessageMetadata msgMetadata) {
                 assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
             }
+
+            @Override
+            public void beforeSendMessage(Subscription subscription,
+                                          Entry entry, long[] ackSet, MessageMetadata msgMetadata, Consumer consumer) {
+                assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
+            }
             @Override
             public void onConnectionCreated(ServerCnx cnx) {
                 assertEquals(Thread.currentThread().getContextClassLoader(), narLoader);
@@ -177,6 +183,8 @@ public class BrokerInterceptorWithClassLoaderTest {
         // test beforeSendMessage
         brokerInterceptorWithClassLoader
                 .beforeSendMessage(mock(Subscription.class), mock(Entry.class), null, null);
+        brokerInterceptorWithClassLoader
+                .beforeSendMessage(mock(Subscription.class), mock(Entry.class), null, null, null);
         assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader);
         // test close
         brokerInterceptorWithClassLoader.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 25953a71fbd..dd83fd2a4ce 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -50,10 +50,12 @@ import org.eclipse.jetty.server.Response;
 public class CounterBrokerInterceptor implements BrokerInterceptor {
 
     private AtomicInteger beforeSendCount = new AtomicInteger();
+    private AtomicInteger beforeSendCountAtConsumerLevel = new AtomicInteger();
     private AtomicInteger count = new AtomicInteger();
     private AtomicInteger connectionCreationCount = new AtomicInteger();
     private AtomicInteger producerCount = new AtomicInteger();
     private AtomicInteger consumerCount = new AtomicInteger();
+    private AtomicInteger messagePublishCount = new AtomicInteger();
     private AtomicInteger messageCount = new AtomicInteger();
     private AtomicInteger messageDispatchCount = new AtomicInteger();
     private AtomicInteger messageAckCount = new AtomicInteger();
@@ -104,6 +106,16 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
         producerCount.incrementAndGet();
     }
 
+    @Override
+    public void producerClosed(ServerCnx cnx, Producer producer,
+                                Map<String, String> metadata) {
+        if (log.isDebugEnabled()) {
+            log.debug("Producer with name={}, id={} closed",
+                    producer.getProducerName(), producer.getProducerId());
+        }
+        producerCount.decrementAndGet();
+    }
+
     @Override
     public void consumerCreated(ServerCnx cnx,
                                  Consumer consumer,
@@ -115,6 +127,26 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
         consumerCount.incrementAndGet();
     }
 
+    @Override
+    public void consumerClosed(ServerCnx cnx,
+                                Consumer consumer,
+                                Map<String, String> metadata) {
+        if (log.isDebugEnabled()) {
+            log.debug("Consumer with name={}, id={} closed",
+                    consumer.consumerName(), consumer.consumerId());
+        }
+        consumerCount.decrementAndGet();
+    }
+
+    @Override
+    public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
+        if (log.isDebugEnabled()) {
+            log.debug("Message broker received topic={}, producer={}",
+                    producer.getTopic().getName(), producer.getProducerName());
+        }
+        messagePublishCount.incrementAndGet();
+    }
+
     @Override
     public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
                                  long entryId,
@@ -154,6 +186,19 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
         beforeSendCount.incrementAndGet();
     }
 
+    @Override
+    public void beforeSendMessage(Subscription subscription,
+                                  Entry entry,
+                                  long[] ackSet,
+                                  MessageMetadata msgMetadata,
+                                  Consumer consumer) {
+        if (log.isDebugEnabled()) {
+            log.debug("Send message to topic {}, subscription {}, consumer {}",
+                    subscription.getTopic(), subscription.getName(), consumer.consumerName());
+        }
+        beforeSendCountAtConsumerLevel.incrementAndGet();
+    }
+
     @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
         if (log.isDebugEnabled()) {
@@ -238,6 +283,9 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     }
 
     public int getMessagePublishCount() {
+        return messagePublishCount.get();
+    }
+    public int getMessageProducedCount() {
         return messageCount.get();
     }
 
@@ -253,6 +301,10 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
         return beforeSendCount.get();
     }
 
+    public int getBeforeSendCountAtConsumerLevel() {
+        return beforeSendCountAtConsumerLevel.get();
+    }
+
     public int getConnectionCreationCount() {
         return connectionCreationCount.get();
     }