You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/13 11:30:43 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #12536: [pulsar-broker] Broker extensions to allow operators of enterprise wide cluster better control and flexibility

codelipenghui commented on a change in pull request #12536:
URL: https://github.com/apache/pulsar/pull/12536#discussion_r767658589



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -2620,6 +2639,12 @@ public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long e
             val brokerInterceptor = getBrokerService().getInterceptor();
             if (brokerInterceptor != null) {
                 brokerInterceptor.onPulsarCommand(command, this);
+
+                CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
+                if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+                    Consumer consumer = consumerFuture.getNow(null);
+                    brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);

Review comment:
       The `metadataAndPayload` does not contain the redeliveryCount, ackSet etc. I think it might be used in the future for the interceptor.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
##########
@@ -98,6 +98,13 @@ default long getNumberOfMessages() {
         default boolean isMarkerMessage() {
             return false;
         }
+
+        default void setProperty(String propertyName, Object value) {
+        }
+
+        default Object getProperty(String propertyName) {
+            return null;
+        }

Review comment:
       Looks not related to this PR?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
##########
@@ -58,6 +65,66 @@ default void beforeSendMessage(Subscription subscription,
                                    MessageMetadata msgMetadata) {
     }
 
+    /**
+     * Called by the broker when a new connection is created.
+     */
+    default void onConnectionCreated(ServerCnx cnx){
+    }
+
+    /**
+     * Called by the broker when a new connection is created.
+     */
+    default void producerCreated(ServerCnx cnx, Producer producer,
+                                 Map<String, String> metadata){
+    }
+
+    /**
+     * Intercept after a consumer is created.
+     *
+     * @param cnx client Connection
+     * @param consumer Consumer object
+     * @param metadata A map of metdata
+     */
+    default void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+    }
+
+    /**
+     * Intercept after a message is produced.
+     *
+     * @param cnx client Connection
+     * @param producer Producer object
+     * @param publishContext Publish Context
+     */
+    default void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,

Review comment:
       Could you please provide more context about why we should add `rateIn` as a param for this method?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
##########
@@ -102,6 +108,78 @@ public void beforeSendMessage(Subscription subscription,
         }
     }
 
+    @Override
+    public void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org