You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/13 12:25:53 UTC

[GitHub] [pulsar] hangc0276 commented on a change in pull request #12536: [pulsar-broker] Broker extensions to allow operators of enterprise wide cluster better control and flexibility

hangc0276 commented on a change in pull request #12536:
URL: https://github.com/apache/pulsar/pull/12536#discussion_r767610794



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
##########
@@ -57,4 +58,28 @@
         }
         return interceptors;
     }
+    public static <T> Set<T> loadInterceptors(

Review comment:
       We'd better merge the `loadBrokerEntryMetadataInterceptors` into `loadInterceptors` method.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
##########
@@ -102,6 +108,78 @@ public void beforeSendMessage(Subscription subscription,
         }
     }
 
+    @Override
+    public void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {

Review comment:
       +1 please check null or empty first.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -38,10 +42,28 @@
 
 
     private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
+    private final Set<ManagedLedgerPayloadProcessor.Processor> inputProcessors;
+    private final Set<ManagedLedgerPayloadProcessor.Processor> outputProcessors;
 
 
     public ManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors) {

Review comment:
       this method just used by tests, could we remove it?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1420,11 +1435,15 @@ protected void handleAck(CommandAck ack) {
         final long consumerId = ack.getConsumerId();
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+            Consumer consumer = consumerFuture.getNow(null);

Review comment:
       +1

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
##########
@@ -452,6 +479,10 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
             callback.originalSequenceId = -1L;
             callback.startTimeNs = startTimeNs;
             callback.isMarker = isMarker;
+            if (callback.propertyMap != null) {
+                callback.propertyMap.clear();
+                callback.propertyMap = null;

Review comment:
       Could we just clear the `propertyMap` to reuse the hashMap object?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -848,6 +848,10 @@ public MessageId getStartMessageId() {
         return startMessageId;
     }
 
+    public Map<String, String> getMetadata() {

Review comment:
       No usage found of the method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org