You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/03 18:59:23 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #12536: [pulsar-broker] Broker extensions to allow operators of enterprise wide cluster better control and flexibility

merlimat commented on a change in pull request #12536:
URL: https://github.com/apache/pulsar/pull/12536#discussion_r742145102



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
##########
@@ -77,6 +77,7 @@
     private int newEntriesCheckDelayInMillis = 10;
     private Clock clock = Clock.systemUTC();
     private ManagedLedgerInterceptor managedLedgerInterceptor;
+    private ManagedLedgerInterceptor managedLedgerPayloadProcessor;

Review comment:
       If we're using the same interface, couldn't we have a single `managedLedgerInterceptor`  instance>

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
##########
@@ -102,6 +108,78 @@ public void beforeSendMessage(Subscription subscription,
         }
     }
 
+    @Override
+    public void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {

Review comment:
       I know this was already the style of other methods here, though we should try to avoid the allocation of an iterator instance in the case where there are no interceptors. eg: 
   
   ```java
   if (interceptors.isEmpty()) {
     return;
   }
   
   for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
        // .... 
   }
   ```

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
##########
@@ -221,7 +221,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
                             Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
                             if (iterator.hasNext()) {
                                 LedgerEntry ledgerEntry = iterator.next();
-                                EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
+                                EntryImpl returnEntry = EntryImpl.create(ledgerEntry, ml.getConfig().getManagedLedgerPayloadProcessor());

Review comment:
       Maybe we can store the interceptor as a member variable of `EntryCacheImpl`.

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
##########
@@ -264,4 +264,11 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map<String, String
      */
     PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader);
 
+    /**
+     *  Set extra headers.
+     * @param extraClientHeaders
+     * @return
+     */
+    PulsarAdminBuilder extraClientHeaders(String extraClientHeaders);

Review comment:
       I think this change should go in a separate PR since it's a bit different from the payload interceptor change.
   
   Also, it's not clear from API perspective how the extra headers are going to be used and what is the format of that string is.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
##########
@@ -307,6 +308,7 @@ public TransportCnx getCnx() {
     }
 
     private static final class MessagePublishContext implements PublishContext, Runnable {
+        Map<String, Object> propertyMap = new HashMap<>();

Review comment:
       It's not clear why we need to use the property map here and we should try to avoid the allocation if it's not being used.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerPayloadProcessor.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.intercept;
+
+import io.netty.buffer.ByteBuf;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.intercept.MessagePayloadProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManagedLedgerPayloadProcessor implements ManagedLedgerInterceptor {
+    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerPayloadProcessor.class);
+    private static final String INDEX = "index";
+    private ServiceConfiguration serviceConfig;
+
+
+    private final Set<MessagePayloadProcessor> brokerEntryPayloadProcessors;
+    public ManagedLedgerPayloadProcessor(ServiceConfiguration serviceConfig,
+                                         Set<MessagePayloadProcessor> brokerEntryPayloadProcessors) {
+        this.serviceConfig = serviceConfig;
+        this.brokerEntryPayloadProcessors = brokerEntryPayloadProcessors;
+    }
+    @Override
+    public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+        return null;
+    }
+    @Override
+    public ByteBuf beforeStoreEntryToLedger(OpAddEntry op, ByteBuf ledgerData) {
+        Map<String, Object> contextMap = new HashMap<>();
+        ByteBuf dataToStore = ledgerData;
+
+        for (MessagePayloadProcessor payloadProcessor : brokerEntryPayloadProcessors) {
+            dataToStore = payloadProcessor.interceptIn(
+                serviceConfig.getClusterName(),
+                dataToStore, contextMap);
+            if (op.getCtx() instanceof Topic.PublishContext){
+                Topic.PublishContext ctx = (Topic.PublishContext) op.getCtx();
+                contextMap.forEach((k, v) -> {
+                    ctx.setProperty(k, v);
+                });
+            }
+        }
+        return dataToStore;
+    }
+
+    @Override
+    public ByteBuf beforeCacheEntryFromLedger(ByteBuf ledgerData){
+        ByteBuf dataToCache = ledgerData;
+        for (MessagePayloadProcessor payloadProcessor : brokerEntryPayloadProcessors) {
+            dataToCache = payloadProcessor.interceptOut(dataToCache);
+        }
+        return dataToCache;
+    }
+
+    @Override
+    public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
+    }
+    @Override
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        promise.complete(null);
+        return promise;

Review comment:
       ```suggestion
           return CompletableFuture.completedFuture(null);
   ```

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java
##########
@@ -58,4 +59,23 @@
      * @param propertiesMap  map of properties.
      */
     void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap);
+
+    /**
+     * Intercept after entry is read from ledger, before it gets cached.
+     * @param ledgerData data from ledger
+     * @return data after processing, if any
+     */
+    default ByteBuf beforeCacheEntryFromLedger(ByteBuf ledgerData){

Review comment:
       I think we need to be very explicit here (and careful :) ) on the reference-count expectations here for the buffers, to avoid memory leaks or use-after-destroyed scenarios. 
   

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1420,11 +1435,15 @@ protected void handleAck(CommandAck ack) {
         final long consumerId = ack.getConsumerId();
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+            Consumer consumer = consumerFuture.getNow(null);

Review comment:
       Since we have the `consumer` variable here, we can also change the next line to avoid doing `getNow()` again. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
##########
@@ -102,6 +108,78 @@ public void beforeSendMessage(Subscription subscription,
         }
     }
 
+    @Override
+    public void consumerCreated(ServerCnx cnx,
+                                 Consumer consumer,
+                                 Map<String, String> metadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.consumerCreated(
+                    cnx,
+                    consumer,
+                    metadata);
+        }
+    }
+
+    @Override
+    public void producerCreated(ServerCnx cnx, Producer producer,
+                                 Map<String, String> metadata){
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.producerCreated(cnx, producer, metadata);
+        }
+    }
+
+    @Override
+    public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
+                                 long entryId, Rate rateIn,
+                                 Topic.PublishContext publishContext) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, rateIn, publishContext);
+        }
+    }
+
+    @Override
+    public  void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
+                                   long entryId, ByteBuf headersAndPayload) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
+        }
+    }
+
+    @Override
+    public void messageAcked(ServerCnx cnx, Consumer consumer,
+                              CommandAck ackCmd) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.messageAcked(cnx, consumer, ackCmd);
+        }
+    }
+
+    @Override
+    public boolean delegateSuperUserCheck(){
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            if (value.delegateSuperUserCheck()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isSuperUser(PulsarService pulsarService, String appId, String originalPrincipal){

Review comment:
       I think this would be more suitable to custom authorization provider, where there is already the `isSuperUser()` method. 

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
##########
@@ -47,12 +48,22 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
     ByteBuf data;
 
     public static EntryImpl create(LedgerEntry ledgerEntry) {
+        return create(ledgerEntry,null);
+    }
+
+    public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor managedLedgerInterceptor) {
         EntryImpl entry = RECYCLER.get();
         entry.timestamp = System.nanoTime();
         entry.ledgerId = ledgerEntry.getLedgerId();
         entry.entryId = ledgerEntry.getEntryId();
         entry.data = ledgerEntry.getEntryBuffer();
-        entry.data.retain();
+        ByteBuf cacheBuf = entry.data;
+        if(managedLedgerInterceptor != null)
+            cacheBuf = managedLedgerInterceptor.beforeCacheEntryFromLedger(entry.data);
+        if(cacheBuf != null && cacheBuf != entry.data)
+            entry.data = cacheBuf;
+        else
+            entry.data.retain();

Review comment:
       Instead of checking the equality of the buffer instance, the interceptor should have a well defined semantic on the expectation of the reference count of the returned buffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org