You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/30 11:01:46 UTC

[pulsar] branch master updated: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher (#12970)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dac47cb  [PIP-105] Part-2 Support pluggable entry filter in Dispatcher (#12970)
dac47cb is described below

commit dac47cb8360c71597a2d4404c60667a769073b6e
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Nov 30 19:00:19 2021 +0800

    [PIP-105] Part-2 Support pluggable entry filter in Dispatcher (#12970)
---
 conf/broker.conf                                   |   8 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  16 ++
 .../broker/service/AbstractBaseDispatcher.java     |  56 +++++++
 .../pulsar/broker/service/AbstractTopic.java       |   1 +
 .../pulsar/broker/service/BrokerService.java       |  18 +++
 .../org/apache/pulsar/broker/service/Topic.java    |   6 +
 .../service/persistent/PersistentSubscription.java |   3 +-
 .../pulsar/broker/service/plugin/EntryFilter.java  |  53 +++++++
 .../service/plugin/EntryFilterDefinition.java      |  42 ++++++
 .../service/plugin/EntryFilterDefinitions.java     |  28 ++++
 .../broker/service/plugin/EntryFilterMetaData.java |  37 +++++
 .../broker/service/plugin/EntryFilterProvider.java | 152 +++++++++++++++++++
 .../service/plugin/EntryFilterWithClassLoader.java |  50 +++++++
 .../broker/service/plugin/FilterContext.java       |  36 +++++
 .../pulsar/broker/service/plugin/package-info.java |  19 +++
 .../broker/service/plugin/EntryFilter2Test.java    |  51 +++++++
 .../broker/service/plugin/EntryFilterTest.java     |  48 ++++++
 .../broker/service/plugin/FilterEntryTest.java     | 162 +++++++++++++++++++++
 18 files changed, 785 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 9d660c6..4496be4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -411,6 +411,14 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
 # Precise dispathcer flow control according to history message number of each entry
 preciseDispatcherFlowControl=false
 
+# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered
+# You can use this class to decide which entries can be sent to consumers.
+# Multiple classes need to be separated by commas.
+entryFilterNames=
+
+# The directory for all the entry filter implementations
+entryFiltersDirectory=
+
 # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
 maxConcurrentLookupRequest=50000
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index efc9f43..906ef3f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -836,6 +836,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean preciseDispatcherFlowControl = false;
 
     @FieldContext(
+         dynamic = true,
+         category = CATEGORY_SERVER,
+         doc = " Class name of pluggable entry filter that decides whether the entry needs to be filtered."
+                 + "You can use this class to decide which entries can be sent to consumers."
+                 + "Multiple names need to be separated by commas."
+    )
+    private List<String> entryFilterNames = new ArrayList<>();
+
+    @FieldContext(
+         dynamic = true,
+         category = CATEGORY_SERVER,
+         doc = " The directory for all the entry filter implementations."
+    )
+    private String entryFiltersDirectory = "";
+
+    @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " +
                 "in subsequent release."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index b53de2f..8970ec6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,18 +19,26 @@
 
 package org.apache.pulsar.broker.service;
 
+import com.google.common.collect.ImmutableList;
 import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.FilterContext;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -48,11 +56,25 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+    /**
+     * Entry filters in Broker.
+     * Not set to final, for the convenience of testing mock.
+     */
+    protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
+    protected final FilterContext filterContext;
 
     protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
         this.subscription = subscription;
         this.serviceConfig = serviceConfig;
         this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
+        if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic()
+                .getBrokerService().getEntryFilters())) {
+            this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
+            this.filterContext = new FilterContext();
+        } else {
+            this.entryFilters = ImmutableList.of();
+            this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
+        }
     }
 
     /**
@@ -113,6 +135,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
+        List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
         for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
             Entry entry = entries.get(i);
             if (entry == null) {
@@ -127,6 +150,15 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     : msgMetadata;
+            if (CollectionUtils.isNotEmpty(entryFilters)) {
+                fillContext(filterContext, msgMetadata, subscription);
+                if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
+                    entriesToFiltered.add(entry.getPosition());
+                    entries.set(i, null);
+                    entry.release();
+                    continue;
+                }
+            }
             if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
                     && msgMetadata.hasTxnidLeastBits()) {
                 if (Markers.isTxnMarker(msgMetadata)) {
@@ -183,12 +215,36 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
             }
         }
+        if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
+            subscription.acknowledgeMessage(entriesToFiltered, AckType.Individual,
+                    Collections.emptyMap());
+        }
+
         sendMessageInfo.setTotalMessages(totalMessages);
         sendMessageInfo.setTotalBytes(totalBytes);
         sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
         return totalEntries;
     }
 
+    private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
+                                                            ImmutableList<EntryFilterWithClassLoader> entryFilters) {
+        EntryFilter.FilterResult result = EntryFilter.FilterResult.ACCEPT;
+        for (EntryFilter entryFilter : entryFilters) {
+            if (entryFilter.filterEntry(entry, filterContext) == EntryFilter.FilterResult.REJECT) {
+                result = EntryFilter.FilterResult.REJECT;
+                break;
+            }
+        }
+        return result;
+    }
+
+    private void fillContext(FilterContext context, MessageMetadata msgMetadata,
+                             Subscription subscription) {
+        context.reset();
+        context.setMsgMetadata(msgMetadata);
+        context.setSubscription(subscription);
+    }
+
     /**
      * Determine whether the number of consumers on the subscription reaches the threshold.
      * @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 26c591d..ac83775 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -318,6 +318,7 @@ public abstract class AbstractTopic implements Topic {
     }
 
 
+    @Override
     public BrokerService getBrokerService() {
         return brokerService;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7b8ae80..7fa59bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -26,6 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
@@ -114,6 +115,8 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -267,6 +270,7 @@ public class BrokerService implements Closeable {
     private boolean preciseTopicPublishRateLimitingEnable;
     private final LongAdder pausedConnections = new LongAdder();
     private BrokerInterceptor interceptor;
+    private ImmutableMap<String, EntryFilterWithClassLoader> entryFilters;
 
     private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
 
@@ -299,6 +303,9 @@ public class BrokerService implements Closeable {
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
         this.authorizationService = new AuthorizationService(
                 pulsar.getConfiguration(), pulsar().getPulsarResources());
+        if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
+            this.entryFilters = EntryFilterProvider.createEntryFilters(pulsar.getConfiguration());
+        }
 
         pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
         pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
@@ -712,6 +719,17 @@ public class BrokerService implements Closeable {
                 }
             });
 
+            //close entry filters
+            if (entryFilters != null) {
+                entryFilters.forEach((name, filter) -> {
+                    try {
+                        filter.close();
+                    } catch (Exception e) {
+                        log.warn("Error shutting down entry filter {}", name, e);
+                    }
+                });
+            }
+
             CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
             log.info("Event loops shutting down gracefully...");
             List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 9db4111..89cc448 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -294,4 +294,10 @@ public interface Topic {
      */
     CompletableFuture<Void> truncate();
 
+    /**
+     * Get BrokerService.
+     * @return
+     */
+    BrokerService getBrokerService();
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2c23bee..061d038 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
@@ -158,7 +159,7 @@ public class PersistentSubscription implements Subscription {
         this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
-        this.subscriptionProperties = subscriptionProperties == null
+        this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
                 ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
                 && !checkTopicIsEventsNames(TopicName.get(topicName))) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
new file mode 100644
index 0000000..40e6644
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import org.apache.bookkeeper.mledger.Entry;
+
+public interface EntryFilter {
+
+    /**
+     * 1. Broker determines whether to filter out this entry based on the return value of this method.
+     * 2. Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * 3. Return ACCEPT or null will be regarded as ACCEPT.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    /**
+     * close the entry filter.
+     */
+    void close();
+
+
+    enum FilterResult {
+        /**
+         * deliver to the consumer.
+         */
+        ACCEPT,
+        /**
+         * skip the message.
+         */
+        REJECT,
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
new file mode 100644
index 0000000..5df3944
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class EntryFilterDefinition {
+
+    /**
+     * The name of the entry filter.
+     */
+    private String name;
+
+    /**
+     * The description of the entry filter to be used for user help.
+     */
+    private String description;
+
+    /**
+     * The class name for the entry filter.
+     */
+    private String entryFilterClass;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
new file mode 100644
index 0000000..9aa3113
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+
+@Data
+public class EntryFilterDefinitions {
+    private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java
new file mode 100644
index 0000000..babaa80
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import java.nio.file.Path;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class EntryFilterMetaData {
+    /**
+     * The definition of the entry filter.
+     */
+    private EntryFilterDefinition definition;
+
+    /**
+     * The path to the handler package.
+     */
+    private Path archivePath;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
new file mode 100644
index 0000000..9e19a57
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+@Slf4j
+public class EntryFilterProvider {
+
+    static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter.yml";
+
+    /**
+     * create entry filter instance.
+     */
+    public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters(
+            ServiceConfiguration conf) throws IOException {
+        EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(),
+                conf.getNarExtractionDirectory());
+        ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder();
+        for (String filterName : conf.getEntryFilterNames()) {
+            EntryFilterMetaData metaData = definitions.getFilters().get(filterName);
+            if (null == metaData) {
+                throw new RuntimeException("No entry filter is found for name `" + filterName
+                        + "`. Available entry filters are : " + definitions.getFilters());
+            }
+            EntryFilterWithClassLoader filter;
+            filter = load(metaData, conf.getNarExtractionDirectory());
+            if (filter != null) {
+                builder.put(filterName, filter);
+            }
+            log.info("Successfully loaded entry filter for name `{}`", filterName);
+        }
+        return builder.build();
+    }
+
+    private static EntryFilterDefinitions searchForEntryFilters(String entryFiltersDirectory,
+                                                                            String narExtractionDirectory)
+            throws IOException {
+        Path path = Paths.get(entryFiltersDirectory).toAbsolutePath();
+        log.info("Searching for entry filters in {}", path);
+
+        EntryFilterDefinitions entryFilterDefinitions = new EntryFilterDefinitions();
+        if (!path.toFile().exists()) {
+            log.info("Pulsar entry filters directory not found");
+            return entryFilterDefinitions;
+        }
+
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
+            for (Path archive : stream) {
+                try {
+                    EntryFilterDefinition def =
+                            getEntryFilterDefinition(archive.toString(), narExtractionDirectory);
+                    log.info("Found entry filter from {} : {}", archive, def);
+
+                    checkArgument(StringUtils.isNotBlank(def.getName()));
+                    checkArgument(StringUtils.isNotBlank(def.getEntryFilterClass()));
+
+                    EntryFilterMetaData metadata = new EntryFilterMetaData();
+                    metadata.setDefinition(def);
+                    metadata.setArchivePath(archive);
+
+                    entryFilterDefinitions.getFilters().put(def.getName(), metadata);
+                } catch (Throwable t) {
+                    log.warn("Failed to load entry filters from {}."
+                            + " It is OK however if you want to use this entry filters,"
+                            + " please make sure you put the correct entry filter NAR"
+                            + " package in the entry filter directory.", archive, t);
+                }
+            }
+        }
+
+        return entryFilterDefinitions;
+    }
+
+    private static EntryFilterDefinition getEntryFilterDefinition(String narPath,
+                                                                              String narExtractionDirectory)
+            throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
+                narExtractionDirectory)) {
+            return getEntryFilterDefinition(ncl);
+        }
+    }
+
+    private static EntryFilterDefinition getEntryFilterDefinition(NarClassLoader ncl) throws IOException {
+        String configStr = ncl.getServiceDefinition(ENTRY_FILTER_DEFINITION_FILE);
+
+        return ObjectMapperFactory.getThreadLocalYaml().readValue(
+                configStr, EntryFilterDefinition.class
+        );
+    }
+
+    private static EntryFilterWithClassLoader load(EntryFilterMetaData metadata,
+                                                               String narExtractionDirectory)
+            throws IOException {
+        NarClassLoader ncl = NarClassLoader.getFromArchive(
+                metadata.getArchivePath().toAbsolutePath().toFile(),
+                Collections.emptySet(),
+                EntryFilter.class.getClassLoader(), narExtractionDirectory);
+
+        EntryFilterDefinition def = getEntryFilterDefinition(ncl);
+        if (StringUtils.isBlank(def.getEntryFilterClass())) {
+            throw new IOException("Entry filters `" + def.getName() + "` does NOT provide a entry"
+                    + " filters implementation");
+        }
+
+        try {
+            Class entryFilterClass = ncl.loadClass(def.getEntryFilterClass());
+            Object filter = entryFilterClass.getDeclaredConstructor().newInstance();
+            if (!(filter instanceof EntryFilter)) {
+                throw new IOException("Class " + def.getEntryFilterClass()
+                        + " does not implement entry filter interface");
+            }
+            EntryFilter pi = (EntryFilter) filter;
+            return new EntryFilterWithClassLoader(pi, ncl);
+        } catch (Exception e) {
+            if (e instanceof IOException) {
+                throw (IOException) e;
+            }
+            log.error("Failed to load class {}", def.getEntryFilterClass(), e);
+            throw new IOException(e);
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
new file mode 100644
index 0000000..8c2569c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import java.io.IOException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+@Slf4j
+public class EntryFilterWithClassLoader implements EntryFilter {
+    private final EntryFilter entryFilter;
+    private final NarClassLoader classLoader;
+
+    public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) {
+        this.entryFilter = entryFilter;
+        this.classLoader = classLoader;
+    }
+
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        return entryFilter.filterEntry(entry, context);
+    }
+
+    @Override
+    public void close() {
+        entryFilter.close();
+        try {
+            classLoader.close();
+        } catch (IOException e) {
+            log.error("close EntryFilterWithClassLoader failed", e);
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
new file mode 100644
index 0000000..e520e10
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import lombok.Data;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+@Data
+public class FilterContext {
+    private Subscription subscription;
+    private MessageMetadata msgMetadata;
+
+    public void reset() {
+        subscription = null;
+        msgMetadata = null;
+    }
+
+    public static final FilterContext FILTER_CONTEXT_DISABLED = new FilterContext();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java
new file mode 100644
index 0000000..05e54ca
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java
new file mode 100644
index 0000000..dbcaa16
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilter2Test implements EntryFilter {
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
+            return FilterResult.ACCEPT;
+        }
+        List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
+        // filter by subscription properties
+        PersistentSubscription subscription = (PersistentSubscription) context.getSubscription();
+        if (!MapUtils.isEmpty(subscription.getSubscriptionProperties())) {
+            for (KeyValue keyValue : list) {
+                if(subscription.getSubscriptionProperties().containsKey(keyValue.getKey())){
+                    return FilterResult.ACCEPT;
+                }
+            }
+        }
+        return FilterResult.REJECT;
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
new file mode 100644
index 0000000..812d49a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilterTest implements EntryFilter {
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
+            return FilterResult.ACCEPT;
+        }
+        List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
+        // filter by string
+        for (KeyValue keyValue : list) {
+            if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) {
+                return FilterResult.ACCEPT;
+            } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){
+                return FilterResult.REJECT;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
new file mode 100644
index 0000000..0338935
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class FilterEntryTest extends BrokerTestBase {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        baseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    public void testFilter() throws Exception {
+
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName(subName).subscribe();
+        // mock entry filters
+        PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subName);
+        Dispatcher dispatcher = subscription.getDispatcher();
+        Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spy(new EntryFilterWithClassLoader(filter1, narClassLoader));
+        EntryFilter filter2 = new EntryFilter2Test();
+        EntryFilterWithClassLoader loader2 = spy(new EntryFilterWithClassLoader(filter2, narClassLoader));
+        field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topic).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send("test");
+        }
+
+        int counter = 0;
+        while (true) {
+            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                counter++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        // All normal messages can be received
+        assertEquals(10, counter);
+        MessageIdImpl lastMsgId = null;
+        for (int i = 0; i < 10; i++) {
+            lastMsgId = (MessageIdImpl) producer.newMessage().property("REJECT", "").value("1").send();
+        }
+        counter = 0;
+        while (true) {
+            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                counter++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        // REJECT messages are filtered out
+        assertEquals(0, counter);
+
+        // All messages should be acked, check the MarkDeletedPosition
+        assertNotNull(lastMsgId);
+        MessageIdImpl finalLastMsgId = lastMsgId;
+        Awaitility.await().untilAsserted(() -> {
+            PositionImpl position = (PositionImpl) subscription.getCursor().getMarkDeletedPosition();
+            assertEquals(position.getLedgerId(), finalLastMsgId.getLedgerId());
+            assertEquals(position.getEntryId(), finalLastMsgId.getEntryId());
+        });
+        consumer.close();
+
+        Map<String, String> map = new HashMap<>();
+        map.put("1","1");
+        map.put("2","2");
+        consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map)
+                .subscriptionName(subName).subscribe();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().property(String.valueOf(i), String.valueOf(i)).value("1").send();
+        }
+        counter = 0;
+        while (true) {
+            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                counter++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        assertEquals(2, counter);
+
+        producer.close();
+        consumer.close();
+
+        BrokerService brokerService = pulsar.getBrokerService();
+        Field field1 = BrokerService.class.getDeclaredField("entryFilters");
+        field1.setAccessible(true);
+        field1.set(brokerService, ImmutableMap.of("1", loader1, "2", loader2));
+        cleanup();
+        verify(loader1, times(1)).close();
+        verify(loader2, times(1)).close();
+
+    }
+
+}