You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/25 04:51:46 UTC

[GitHub] [pulsar] 315157973 opened a new pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

315157973 opened a new pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970


   ### Motivation
   see https://github.com/apache/pulsar/issues/12269
   
   ### Modifications
   Add pluggable entry filter
   
   ### Documentation
   - [ x ] `doc-required` 
     
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r759015193



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilterTest implements EntryFilter {
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
+            return FilterResult.ACCEPT;
+        }
+        List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
+        // filter by string
+        for (KeyValue keyValue : list) {
+            if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) {
+                return FilterResult.ACCEPT;
+            } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){
+                return FilterResult.REJECT;
+            }
+        }
+        return null;

Review comment:
       what happens in case of "null" result ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
##########
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+@Slf4j
+public class EntryFilterProvider {
+
+    static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter.yml";
+
+    /**
+     * create entry filter instance.
+     */
+    public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters(
+            ServiceConfiguration conf) throws IOException {
+        EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(),
+                conf.getNarExtractionDirectory());
+        ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder();
+        for (String filterName : conf.getEntryFilterNames()) {
+            EntryFilterMetaData metaData = definitions.getFilters().get(filterName);
+            if (null == metaData) {
+                throw new RuntimeException("No entry filter is found for name `" + filterName
+                        + "`. Available entry filters are : " + definitions.getFilters());
+            }
+            EntryFilterWithClassLoader filter;
+            filter = load(metaData, conf.getNarExtractionDirectory());
+            if (filter != null) {
+                builder.put(filterName, filter);
+            }
+            log.info("Successfully loaded entry filter for name `{}`", filterName);
+        }
+        return builder.build();
+    }
+
+    private static EntryFilterDefinitions searchForEntryFilters(String entryFiltersDirectory,
+                                                                            String narExtractionDirectory)
+            throws IOException {
+        Path path = Paths.get(entryFiltersDirectory).toAbsolutePath();
+        log.info("Searching for entry filters in {}", path);
+
+        EntryFilterDefinitions entryFilterDefinitions = new EntryFilterDefinitions();
+        if (!path.toFile().exists()) {
+            log.warn("Pulsar entry filters directory not found");

Review comment:
       log.info

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilterTest implements EntryFilter {
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
+            return FilterResult.ACCEPT;
+        }
+        List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
+        // filter by string
+        for (KeyValue keyValue : list) {
+            if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) {
+                return FilterResult.ACCEPT;
+            } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){
+                return FilterResult.REJECT;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+

Review comment:
       can we verify that the close() method has been called ?
   you can use a static variable

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -183,12 +215,35 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
                 interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
             }
         }
+        if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
+            subscription.acknowledgeMessage(entriesToFiltered, AckType.Individual,
+                    Collections.emptyMap());
+        }
+
         sendMessageInfo.setTotalMessages(totalMessages);
         sendMessageInfo.setTotalBytes(totalBytes);
         sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
         return totalEntries;
     }
 
+    private EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry) {

Review comment:
       nit: static




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r757309992



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -189,6 +220,32 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
         return totalEntries;
     }
 
+    private EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry) {
+        EntryFilter.FilterResult result = EntryFilter.FilterResult.REJECT;
+        for (EntryFilter entryFilter : entryFilters) {
+            if (entryFilter.filterEntry(entry, filterContext) == EntryFilter.FilterResult.ACCEPT) {
+                result = EntryFilter.FilterResult.ACCEPT;

Review comment:
       We should return ACCEPT only if all the filters accepted the entry? it more like the FilterChain of jetty http://www.servlets.com/javadoc/javax/servlet/FilterChain.html

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
##########
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+@Slf4j
+public class EntryFilterProvider {
+
+    static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter.yml";
+
+    /**
+     * create entry filter instance.
+     */
+    public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters(
+            ServiceConfiguration conf) throws IOException {
+        EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(),
+                conf.getNarExtractionDirectory());
+        ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder();
+        conf.getEntryFilterNames().forEach(filterName -> {
+            EntryFilterMetaData metaData = definitions.getFilters().get(filterName);
+            if (null == metaData) {
+                throw new RuntimeException("No entry filter is found for name `" + filterName
+                        + "`. Available entry filters are : " + definitions.getFilters());
+            }
+            EntryFilterWithClassLoader filter;
+            try {
+                filter = load(metaData, conf.getNarExtractionDirectory());
+                if (filter != null) {
+                    builder.put(filterName, filter);
+                }
+                log.info("Successfully loaded entry filter for name `{}`", filterName);
+            } catch (IOException e) {
+                log.error("Failed to load the entry filter for name `" + filterName + "`", e);
+                throw new RuntimeException("Failed to load the broker interceptor for name `" + filterName + "`");

Review comment:
       Can we throw the IOException directly? we will lose original stack if encounter IOException

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import lombok.Data;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.SubscriptionOption;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+@Data
+public class FilterContext {
+    private EntryBatchSizes batchSizes;
+    private SendMessageInfo sendMessageInfo;
+    private EntryBatchIndexesAcks indexesAcks;
+    private ManagedCursor cursor;
+    private boolean isReplayRead;
+    private Subscription subscription;
+    private SubscriptionOption subscriptionOption;
+    private MessageMetadata msgMetadata;

Review comment:
       Do we need to expose all of them to the filter? I don't think the plugin need to filter the messages by batchSizes, sendMessageInfo, indexesAcks, cursor, isReplayRead?
   
   And if we expose Subscription, SubscriptionOption, MessageMetadata, users will get the write access, maybe we'd better to only expose the topic name, the subname and subType and a readonly MessageMetadata or a copy is more reasonable here? For the filter, it is always in the read-only mode.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -113,6 +130,7 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
+        FilterContext filterContext = new FilterContext();

Review comment:
       And if the broker don't have a filter, we should not create such objects? we can use a FilterContextDisabled to instead?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -127,6 +145,19 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     : msgMetadata;
+            if (CollectionUtils.isNotEmpty(entryFilters)) {
+                fillContext(filterContext, batchSizes, sendMessageInfo, indexesAcks, cursor, isReplayRead,
+                        msgMetadata, subscription);
+                EntryFilter.FilterResult result = getFilterResult(filterContext, entry);
+                if (EntryFilter.FilterResult.REJECT == result) {
+                    PositionImpl pos = (PositionImpl) entry.getPosition();
+                    entries.set(i, null);
+                    entry.release();
+                    subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,

Review comment:
       If we have 100 entries to dispatch and 50 will be filtered out, we can improve here is only call the subscription.acknowledgeMessage() once. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -113,6 +130,7 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
+        FilterContext filterContext = new FilterContext();

Review comment:
       Should reuse the FilterConext here? otherwise we will introduce more objects during the message dispatching




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#issuecomment-979750178


   Hello, I have processed as required, please take a look again @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#issuecomment-979960491


   @codelipenghui @eolivelli PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r757711869



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public class EntryFilterWithClassLoader implements EntryFilter {
+    private final EntryFilter entryFilter;
+    private final NarClassLoader classLoader;
+
+    public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) {
+        this.entryFilter = entryFilter;
+        this.classLoader = classLoader;
+    }
+
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        return entryFilter.filterEntry(entry, context);
+    }

Review comment:
       Please add a close() method that closed the Classloader and then calls the close method of the wrapper object.
   Otherwise we have a leak

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
##########
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+@Slf4j
+public class EntryFilterProvider {
+
+    static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter.yml";
+
+    /**
+     * create entry filter instance.
+     */
+    public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters(
+            ServiceConfiguration conf) throws IOException {
+        EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(),
+                conf.getNarExtractionDirectory());
+        ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder();
+        for (String filterName : conf.getEntryFilterNames()) {
+            EntryFilterMetaData metaData = definitions.getFilters().get(filterName);
+            if (null == metaData) {
+                throw new RuntimeException("No entry filter is found for name `" + filterName
+                        + "`. Available entry filters are : " + definitions.getFilters());
+            }
+            EntryFilterWithClassLoader filter;
+            filter = load(metaData, conf.getNarExtractionDirectory());
+            if (filter != null) {
+                builder.put(filterName, filter);
+            }
+            log.info("Successfully loaded entry filter for name `{}`", filterName);
+        }
+        return builder.build();
+    }
+
+    private static EntryFilterDefinitions searchForEntryFilters(String entryFiltersDirectory,
+                                                                            String narExtractionDirectory)
+            throws IOException {
+        Path path = Paths.get(entryFiltersDirectory).toAbsolutePath();
+        log.info("Searching for entry filters in {}", path);
+
+        EntryFilterDefinitions entryFilterDefinitions = new EntryFilterDefinitions();
+        if (!path.toFile().exists()) {
+            log.warn("Pulsar entry filters directory not found");
+            return entryFilterDefinitions;
+        }
+
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
+            for (Path archive : stream) {
+                try {
+                    EntryFilterDefinition def =
+                            getEntryFilterDefinition(archive.toString(), narExtractionDirectory);
+                    log.info("Found entry filter from {} : {}", archive, def);
+
+                    checkArgument(StringUtils.isNotBlank(def.getName()));
+                    checkArgument(StringUtils.isNotBlank(def.getEntryFilterClass()));
+
+                    EntryFilterMetaData metadata = new EntryFilterMetaData();
+                    metadata.setDefinition(def);
+                    metadata.setArchivePath(archive);
+
+                    entryFilterDefinitions.getFilters().put(def.getName(), metadata);
+                } catch (Throwable t) {
+                    log.warn("Failed to load entry filters from {}."
+                            + " It is OK however if you want to use this entry filters,"
+                            + " please make sure you put the correct entry filter NAR"
+                            + " package in the entry filter directory.", archive, t);
+                }
+            }
+        }
+
+        return entryFilterDefinitions;
+    }
+
+    private static EntryFilterDefinition getEntryFilterDefinition(String narPath,
+                                                                              String narExtractionDirectory)
+            throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
+                narExtractionDirectory)) {
+            return getEntryFilterDefinition(ncl);
+        }
+    }
+
+    private static EntryFilterDefinition getEntryFilterDefinition(NarClassLoader ncl) throws IOException {
+        String configStr = ncl.getServiceDefinition(ENTRY_FILTER_DEFINITION_FILE);
+
+        return ObjectMapperFactory.getThreadLocalYaml().readValue(
+                configStr, EntryFilterDefinition.class
+        );
+    }
+
+    private static EntryFilterWithClassLoader load(EntryFilterMetaData metadata,
+                                                               String narExtractionDirectory)
+            throws IOException {
+        NarClassLoader ncl = NarClassLoader.getFromArchive(
+                metadata.getArchivePath().toAbsolutePath().toFile(),
+                Collections.emptySet(),
+                BrokerInterceptor.class.getClassLoader(), narExtractionDirectory);
+
+        EntryFilterDefinition def = getEntryFilterDefinition(ncl);
+        if (StringUtils.isBlank(def.getEntryFilterClass())) {
+            throw new IOException("Entry filters `" + def.getName() + "` does NOT provide a broker"
+                    + " interceptors implementation");
+        }
+
+        try {
+            Class entryFilterClass = ncl.loadClass(def.getEntryFilterClass());
+            Object filter = entryFilterClass.getDeclaredConstructor().newInstance();
+            if (!(filter instanceof EntryFilter)) {
+                throw new IOException("Class " + def.getEntryFilterClass()
+                        + " does not implement entry filter interface");
+            }
+            EntryFilter pi = (EntryFilter) filter;
+            return new EntryFilterWithClassLoader(pi, ncl);
+        } catch (Throwable t) {
+            return null;

Review comment:
       Please log the error and fail 
   If the user configured a filter then we must load it
   If the broker starts without the filter we will have an unexpected behaviour, like messages that are dispatched without permission 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
##########
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import org.apache.bookkeeper.mledger.Entry;
+
+public interface EntryFilter {

Review comment:
       Please add a close() to allow lifecycle management, see my comments below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r756625685



##########
File path: conf/broker.conf
##########
@@ -411,6 +411,11 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
 # Precise dispathcer flow control according to history message number of each entry
 preciseDispatcherFlowControl=false
 
+# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered
+# You can use this class to decide which entries can be sent to consumers.
+# Multiple classes need to be separated by commas.
+entryFilterClassNames=

Review comment:
       Do we need a directory to maintain all the filters? otherwise, we need to copy the filter jar to the lib dir.

##########
File path: conf/broker.conf
##########
@@ -411,6 +411,11 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
 # Precise dispathcer flow control according to history message number of each entry
 preciseDispatcherFlowControl=false
 
+# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered
+# You can use this class to decide which entries can be sent to consumers.
+# Multiple classes need to be separated by commas.
+entryFilterClassNames=

Review comment:
       Another point is we should load the class by a separate classloader that extends from the pulsar classloader?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r759023869



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilterTest implements EntryFilter {
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
+            return FilterResult.ACCEPT;
+        }
+        List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
+        // filter by string
+        for (KeyValue keyValue : list) {
+            if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) {
+                return FilterResult.ACCEPT;
+            } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){
+                return FilterResult.REJECT;
+            }
+        }
+        return null;

Review comment:
       Entry will be accepted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r756625509



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+public interface EntryFilter {
+    /**
+     * Broker will determine whether to filter out this Entry based on the return value of this method.

Review comment:
       ```suggestion
        * Broker determines whether to filter out this entry based on the return value of this method.
   ```

##########
File path: conf/broker.conf
##########
@@ -411,6 +411,11 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
 # Precise dispathcer flow control according to history message number of each entry
 preciseDispatcherFlowControl=false
 
+# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered

Review comment:
       ```suggestion
   # Class name of pluggable entry filter that decides whether the entry needs to be filtered
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+public interface EntryFilter {
+    /**
+     * Broker will determine whether to filter out this Entry based on the return value of this method.
+     * Please do not deserialize the entire Entry in this method,
+     * which will have a great impact on Broker's memory and CPU.

Review comment:
       ```suggestion
        * which has a great impact on the broker's memory and CPU.
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+public interface EntryFilter {
+    /**
+     * Broker will determine whether to filter out this Entry based on the return value of this method.
+     * Please do not deserialize the entire Entry in this method,

Review comment:
       ```suggestion
        * Do not deserialize the entire entry in this method,
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Data;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+public interface EntryFilter {
+    /**
+     * Broker will determine whether to filter out this Entry based on the return value of this method.
+     * Please do not deserialize the entire Entry in this method,
+     * which will have a great impact on Broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {
+            batchSizes = null;
+            sendMessageInfo = null;
+            indexesAcks = null;
+            cursor = null;
+            isReplayRead = false;
+            subscription = null;
+            subscriptionOption = null;
+            msgMetadata = null;
+        }
+    }
+
+    enum FilterResult {
+        /**
+         * deliver to the Consumer.

Review comment:
       ```suggestion
            * deliver to the consumer.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r759028431



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilterTest implements EntryFilter {
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
+            return FilterResult.ACCEPT;
+        }
+        List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
+        // filter by string
+        for (KeyValue keyValue : list) {
+            if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) {
+                return FilterResult.ACCEPT;
+            } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){
+                return FilterResult.REJECT;
+            }
+        }
+        return null;

Review comment:
       Return ACCEPT or null will be regarded as ACCEPT, I added this description to the comment of the interface




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#issuecomment-981241081


   @eolivelli PTAL
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#issuecomment-980480807


   @eolivelli All done, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r757269321



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {

Review comment:
       I think Context does not need to be made into an interface, just make it into a class directly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wolfstudy commented on pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#issuecomment-982236233


   ping @eolivelli PTAL thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r757260819



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {

Review comment:
       This method is only for the implementation, do not keep it here

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {

Review comment:
       Here you should only declare an
    interface.
   Move the implementation somewhere else

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {
+            batchSizes = null;
+            sendMessageInfo = null;
+            indexesAcks = null;
+            cursor = null;
+            isReplayRead = false;
+            subscription = null;
+            subscriptionOption = null;
+            msgMetadata = null;
+        }
+    }
+
+    enum FilterResult {
+        /**
+         * deliver to the consumer.
+         */
+        ACCEPT,
+        /**
+         * skip the message.
+         */
+        REJECT,
+    }
+
+    @Data
+    class EntryFilterDefinitions {
+        private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
+    }
+
+    @Data
+    @NoArgsConstructor
+    class EntryFilterMetaData {

Review comment:
       Move this to the implementation, do not expose this it in the public interface

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {
+            batchSizes = null;
+            sendMessageInfo = null;
+            indexesAcks = null;
+            cursor = null;
+            isReplayRead = false;
+            subscription = null;
+            subscriptionOption = null;
+            msgMetadata = null;
+        }
+    }
+
+    enum FilterResult {
+        /**
+         * deliver to the consumer.
+         */
+        ACCEPT,
+        /**
+         * skip the message.
+         */
+        REJECT,
+    }
+
+    @Data
+    class EntryFilterDefinitions {
+        private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
+    }
+
+    @Data
+    @NoArgsConstructor
+    class EntryFilterMetaData {
+
+        /**
+         * The definition of the broker interceptor.
+         */
+        private EntryFilterDefinition definition;
+
+        /**
+         * The path to the handler package.
+         */
+        private Path archivePath;
+    }
+
+    @Data
+    @NoArgsConstructor
+    class EntryFilterDefinition {

Review comment:
       Move this to the implementation, do not expose this it in the public interface

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {
+            batchSizes = null;
+            sendMessageInfo = null;
+            indexesAcks = null;
+            cursor = null;
+            isReplayRead = false;
+            subscription = null;
+            subscriptionOption = null;
+            msgMetadata = null;
+        }
+    }
+
+    enum FilterResult {
+        /**
+         * deliver to the consumer.
+         */
+        ACCEPT,
+        /**
+         * skip the message.
+         */
+        REJECT,
+    }
+
+    @Data
+    class EntryFilterDefinitions {

Review comment:
       Move this to the implementation, do not expose this it in the public interface

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterForTest2.java
##########
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pulsar.broker.service.EntryFilter;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilterForTest2 implements EntryFilter {

Review comment:
       I am not sure that maven selection this class as the name does not end with Test.
   Please rename 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r757725096



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public class EntryFilterWithClassLoader implements EntryFilter {
+    private final EntryFilter entryFilter;
+    private final NarClassLoader classLoader;
+
+    public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) {
+        this.entryFilter = entryFilter;
+        this.classLoader = classLoader;
+    }
+
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        return entryFilter.filterEntry(entry, context);
+    }

Review comment:
       Good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org