You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/30 11:01:46 UTC
[pulsar] branch master updated: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher (#12970)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dac47cb [PIP-105] Part-2 Support pluggable entry filter in Dispatcher (#12970)
dac47cb is described below
commit dac47cb8360c71597a2d4404c60667a769073b6e
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Nov 30 19:00:19 2021 +0800
[PIP-105] Part-2 Support pluggable entry filter in Dispatcher (#12970)
---
conf/broker.conf | 8 +
.../apache/pulsar/broker/ServiceConfiguration.java | 16 ++
.../broker/service/AbstractBaseDispatcher.java | 56 +++++++
.../pulsar/broker/service/AbstractTopic.java | 1 +
.../pulsar/broker/service/BrokerService.java | 18 +++
.../org/apache/pulsar/broker/service/Topic.java | 6 +
.../service/persistent/PersistentSubscription.java | 3 +-
.../pulsar/broker/service/plugin/EntryFilter.java | 53 +++++++
.../service/plugin/EntryFilterDefinition.java | 42 ++++++
.../service/plugin/EntryFilterDefinitions.java | 28 ++++
.../broker/service/plugin/EntryFilterMetaData.java | 37 +++++
.../broker/service/plugin/EntryFilterProvider.java | 152 +++++++++++++++++++
.../service/plugin/EntryFilterWithClassLoader.java | 50 +++++++
.../broker/service/plugin/FilterContext.java | 36 +++++
.../pulsar/broker/service/plugin/package-info.java | 19 +++
.../broker/service/plugin/EntryFilter2Test.java | 51 +++++++
.../broker/service/plugin/EntryFilterTest.java | 48 ++++++
.../broker/service/plugin/FilterEntryTest.java | 162 +++++++++++++++++++++
18 files changed, 785 insertions(+), 1 deletion(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 9d660c6..4496be4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -411,6 +411,14 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false
+# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered
+# You can use this class to decide which entries can be sent to consumers.
+# Multiple classes need to be separated by commas.
+entryFilterNames=
+
+# The directory for all the entry filter implementations
+entryFiltersDirectory=
+
# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index efc9f43..906ef3f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -836,6 +836,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
private boolean preciseDispatcherFlowControl = false;
@FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = " Class name of pluggable entry filter that decides whether the entry needs to be filtered."
+ + "You can use this class to decide which entries can be sent to consumers."
+ + "Multiple names need to be separated by commas."
+ )
+ private List<String> entryFilterNames = new ArrayList<>();
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = " The directory for all the entry filter implementations."
+ )
+ private String entryFiltersDirectory = "";
+
+ @FieldContext(
category = CATEGORY_SERVER,
doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " +
"in subsequent release."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index b53de2f..8970ec6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,18 +19,26 @@
package org.apache.pulsar.broker.service;
+import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -48,11 +56,25 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
protected final ServiceConfiguration serviceConfig;
protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+ /**
+ * Entry filters in Broker.
+ * Not set to final, for the convenience of testing mock.
+ */
+ protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
+ protected final FilterContext filterContext;
protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
+ if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic()
+ .getBrokerService().getEntryFilters())) {
+ this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
+ this.filterContext = new FilterContext();
+ } else {
+ this.entryFilters = ImmutableList.of();
+ this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
+ }
}
/**
@@ -113,6 +135,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
+ List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
@@ -127,6 +150,15 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
msgMetadata = msgMetadata == null
? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
: msgMetadata;
+ if (CollectionUtils.isNotEmpty(entryFilters)) {
+ fillContext(filterContext, msgMetadata, subscription);
+ if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
+ entriesToFiltered.add(entry.getPosition());
+ entries.set(i, null);
+ entry.release();
+ continue;
+ }
+ }
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
@@ -183,12 +215,36 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
}
}
+ if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
+ subscription.acknowledgeMessage(entriesToFiltered, AckType.Individual,
+ Collections.emptyMap());
+ }
+
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;
}
+ private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
+ ImmutableList<EntryFilterWithClassLoader> entryFilters) {
+ EntryFilter.FilterResult result = EntryFilter.FilterResult.ACCEPT;
+ for (EntryFilter entryFilter : entryFilters) {
+ if (entryFilter.filterEntry(entry, filterContext) == EntryFilter.FilterResult.REJECT) {
+ result = EntryFilter.FilterResult.REJECT;
+ break;
+ }
+ }
+ return result;
+ }
+
+ private void fillContext(FilterContext context, MessageMetadata msgMetadata,
+ Subscription subscription) {
+ context.reset();
+ context.setMsgMetadata(msgMetadata);
+ context.setSubscription(subscription);
+ }
+
/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 26c591d..ac83775 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -318,6 +318,7 @@ public abstract class AbstractTopic implements Topic {
}
+ @Override
public BrokerService getBrokerService() {
return brokerService;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7b8ae80..7fa59bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -26,6 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
@@ -114,6 +115,8 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -267,6 +270,7 @@ public class BrokerService implements Closeable {
private boolean preciseTopicPublishRateLimitingEnable;
private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;
+ private ImmutableMap<String, EntryFilterWithClassLoader> entryFilters;
private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
@@ -299,6 +303,9 @@ public class BrokerService implements Closeable {
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
+ if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) {
+ this.entryFilters = EntryFilterProvider.createEntryFilters(pulsar.getConfiguration());
+ }
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
@@ -712,6 +719,17 @@ public class BrokerService implements Closeable {
}
});
+ //close entry filters
+ if (entryFilters != null) {
+ entryFilters.forEach((name, filter) -> {
+ try {
+ filter.close();
+ } catch (Exception e) {
+ log.warn("Error shutting down entry filter {}", name, e);
+ }
+ });
+ }
+
CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>();
log.info("Event loops shutting down gracefully...");
List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 9db4111..89cc448 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -294,4 +294,10 @@ public interface Topic {
*/
CompletableFuture<Void> truncate();
+ /**
+ * Get BrokerService.
+ * @return
+ */
+ BrokerService getBrokerService();
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2c23bee..061d038 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
@@ -158,7 +159,7 @@ public class PersistentSubscription implements Subscription {
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
this.setReplicated(replicated);
- this.subscriptionProperties = subscriptionProperties == null
+ this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& !checkTopicIsEventsNames(TopicName.get(topicName))) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
new file mode 100644
index 0000000..40e6644
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import org.apache.bookkeeper.mledger.Entry;
+
+public interface EntryFilter {
+
+ /**
+ * 1. Broker determines whether to filter out this entry based on the return value of this method.
+ * 2. Do not deserialize the entire entry in this method,
+ * which has a great impact on the broker's memory and CPU.
+ * 3. Return ACCEPT or null will be regarded as ACCEPT.
+ * @param entry
+ * @param context
+ * @return
+ */
+ FilterResult filterEntry(Entry entry, FilterContext context);
+
+ /**
+ * close the entry filter.
+ */
+ void close();
+
+
+ enum FilterResult {
+ /**
+ * deliver to the consumer.
+ */
+ ACCEPT,
+ /**
+ * skip the message.
+ */
+ REJECT,
+ }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
new file mode 100644
index 0000000..5df3944
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinition.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class EntryFilterDefinition {
+
+ /**
+ * The name of the entry filter.
+ */
+ private String name;
+
+ /**
+ * The description of the entry filter to be used for user help.
+ */
+ private String description;
+
+ /**
+ * The class name for the entry filter.
+ */
+ private String entryFilterClass;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
new file mode 100644
index 0000000..9aa3113
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterDefinitions.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+
+@Data
+public class EntryFilterDefinitions {
+ private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java
new file mode 100644
index 0000000..babaa80
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterMetaData.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import java.nio.file.Path;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+public class EntryFilterMetaData {
+ /**
+ * The definition of the entry filter.
+ */
+ private EntryFilterDefinition definition;
+
+ /**
+ * The path to the handler package.
+ */
+ private Path archivePath;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
new file mode 100644
index 0000000..9e19a57
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+@Slf4j
+public class EntryFilterProvider {
+
+ static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter.yml";
+
+ /**
+ * create entry filter instance.
+ */
+ public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters(
+ ServiceConfiguration conf) throws IOException {
+ EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(),
+ conf.getNarExtractionDirectory());
+ ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder();
+ for (String filterName : conf.getEntryFilterNames()) {
+ EntryFilterMetaData metaData = definitions.getFilters().get(filterName);
+ if (null == metaData) {
+ throw new RuntimeException("No entry filter is found for name `" + filterName
+ + "`. Available entry filters are : " + definitions.getFilters());
+ }
+ EntryFilterWithClassLoader filter;
+ filter = load(metaData, conf.getNarExtractionDirectory());
+ if (filter != null) {
+ builder.put(filterName, filter);
+ }
+ log.info("Successfully loaded entry filter for name `{}`", filterName);
+ }
+ return builder.build();
+ }
+
+ private static EntryFilterDefinitions searchForEntryFilters(String entryFiltersDirectory,
+ String narExtractionDirectory)
+ throws IOException {
+ Path path = Paths.get(entryFiltersDirectory).toAbsolutePath();
+ log.info("Searching for entry filters in {}", path);
+
+ EntryFilterDefinitions entryFilterDefinitions = new EntryFilterDefinitions();
+ if (!path.toFile().exists()) {
+ log.info("Pulsar entry filters directory not found");
+ return entryFilterDefinitions;
+ }
+
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
+ for (Path archive : stream) {
+ try {
+ EntryFilterDefinition def =
+ getEntryFilterDefinition(archive.toString(), narExtractionDirectory);
+ log.info("Found entry filter from {} : {}", archive, def);
+
+ checkArgument(StringUtils.isNotBlank(def.getName()));
+ checkArgument(StringUtils.isNotBlank(def.getEntryFilterClass()));
+
+ EntryFilterMetaData metadata = new EntryFilterMetaData();
+ metadata.setDefinition(def);
+ metadata.setArchivePath(archive);
+
+ entryFilterDefinitions.getFilters().put(def.getName(), metadata);
+ } catch (Throwable t) {
+ log.warn("Failed to load entry filters from {}."
+ + " It is OK however if you want to use this entry filters,"
+ + " please make sure you put the correct entry filter NAR"
+ + " package in the entry filter directory.", archive, t);
+ }
+ }
+ }
+
+ return entryFilterDefinitions;
+ }
+
+ private static EntryFilterDefinition getEntryFilterDefinition(String narPath,
+ String narExtractionDirectory)
+ throws IOException {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(),
+ narExtractionDirectory)) {
+ return getEntryFilterDefinition(ncl);
+ }
+ }
+
+ private static EntryFilterDefinition getEntryFilterDefinition(NarClassLoader ncl) throws IOException {
+ String configStr = ncl.getServiceDefinition(ENTRY_FILTER_DEFINITION_FILE);
+
+ return ObjectMapperFactory.getThreadLocalYaml().readValue(
+ configStr, EntryFilterDefinition.class
+ );
+ }
+
+ private static EntryFilterWithClassLoader load(EntryFilterMetaData metadata,
+ String narExtractionDirectory)
+ throws IOException {
+ NarClassLoader ncl = NarClassLoader.getFromArchive(
+ metadata.getArchivePath().toAbsolutePath().toFile(),
+ Collections.emptySet(),
+ EntryFilter.class.getClassLoader(), narExtractionDirectory);
+
+ EntryFilterDefinition def = getEntryFilterDefinition(ncl);
+ if (StringUtils.isBlank(def.getEntryFilterClass())) {
+ throw new IOException("Entry filters `" + def.getName() + "` does NOT provide a entry"
+ + " filters implementation");
+ }
+
+ try {
+ Class entryFilterClass = ncl.loadClass(def.getEntryFilterClass());
+ Object filter = entryFilterClass.getDeclaredConstructor().newInstance();
+ if (!(filter instanceof EntryFilter)) {
+ throw new IOException("Class " + def.getEntryFilterClass()
+ + " does not implement entry filter interface");
+ }
+ EntryFilter pi = (EntryFilter) filter;
+ return new EntryFilterWithClassLoader(pi, ncl);
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ }
+ log.error("Failed to load class {}", def.getEntryFilterClass(), e);
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
new file mode 100644
index 0000000..8c2569c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import java.io.IOException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+@Slf4j
+public class EntryFilterWithClassLoader implements EntryFilter {
+ private final EntryFilter entryFilter;
+ private final NarClassLoader classLoader;
+
+ public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) {
+ this.entryFilter = entryFilter;
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ public FilterResult filterEntry(Entry entry, FilterContext context) {
+ return entryFilter.filterEntry(entry, context);
+ }
+
+ @Override
+ public void close() {
+ entryFilter.close();
+ try {
+ classLoader.close();
+ } catch (IOException e) {
+ log.error("close EntryFilterWithClassLoader failed", e);
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
new file mode 100644
index 0000000..e520e10
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/FilterContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import lombok.Data;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+@Data
+public class FilterContext {
+ private Subscription subscription;
+ private MessageMetadata msgMetadata;
+
+ public void reset() {
+ subscription = null;
+ msgMetadata = null;
+ }
+
+ public static final FilterContext FILTER_CONTEXT_DISABLED = new FilterContext();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java
new file mode 100644
index 0000000..05e54ca
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java
new file mode 100644
index 0000000..dbcaa16
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilter2Test.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilter2Test implements EntryFilter {
+ @Override
+ public FilterResult filterEntry(Entry entry, FilterContext context) {
+ if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
+ return FilterResult.ACCEPT;
+ }
+ List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
+ // filter by subscription properties
+ PersistentSubscription subscription = (PersistentSubscription) context.getSubscription();
+ if (!MapUtils.isEmpty(subscription.getSubscriptionProperties())) {
+ for (KeyValue keyValue : list) {
+ if(subscription.getSubscriptionProperties().containsKey(keyValue.getKey())){
+ return FilterResult.ACCEPT;
+ }
+ }
+ }
+ return FilterResult.REJECT;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
new file mode 100644
index 0000000..812d49a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilterTest implements EntryFilter {
+ @Override
+ public FilterResult filterEntry(Entry entry, FilterContext context) {
+ if (context.getMsgMetadata() == null || context.getMsgMetadata().getPropertiesCount() <= 0) {
+ return FilterResult.ACCEPT;
+ }
+ List<KeyValue> list = context.getMsgMetadata().getPropertiesList();
+ // filter by string
+ for (KeyValue keyValue : list) {
+ if ("ACCEPT".equalsIgnoreCase(keyValue.getKey())) {
+ return FilterResult.ACCEPT;
+ } else if ("REJECT".equalsIgnoreCase(keyValue.getKey())){
+ return FilterResult.REJECT;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
new file mode 100644
index 0000000..0338935
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class FilterEntryTest extends BrokerTestBase {
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ baseSetup();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ public void testFilter() throws Exception {
+
+ String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+ String subName = "sub";
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+ .subscriptionName(subName).subscribe();
+ // mock entry filters
+ PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+ .getTopicReference(topic).get().getSubscription(subName);
+ Dispatcher dispatcher = subscription.getDispatcher();
+ Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+ field.setAccessible(true);
+ NarClassLoader narClassLoader = mock(NarClassLoader.class);
+ EntryFilter filter1 = new EntryFilterTest();
+ EntryFilterWithClassLoader loader1 = spy(new EntryFilterWithClassLoader(filter1, narClassLoader));
+ EntryFilter filter2 = new EntryFilter2Test();
+ EntryFilterWithClassLoader loader2 = spy(new EntryFilterWithClassLoader(filter2, narClassLoader));
+ field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topic).create();
+ for (int i = 0; i < 10; i++) {
+ producer.send("test");
+ }
+
+ int counter = 0;
+ while (true) {
+ Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ if (message != null) {
+ counter++;
+ consumer.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+ // All normal messages can be received
+ assertEquals(10, counter);
+ MessageIdImpl lastMsgId = null;
+ for (int i = 0; i < 10; i++) {
+ lastMsgId = (MessageIdImpl) producer.newMessage().property("REJECT", "").value("1").send();
+ }
+ counter = 0;
+ while (true) {
+ Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ if (message != null) {
+ counter++;
+ consumer.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+ // REJECT messages are filtered out
+ assertEquals(0, counter);
+
+ // All messages should be acked, check the MarkDeletedPosition
+ assertNotNull(lastMsgId);
+ MessageIdImpl finalLastMsgId = lastMsgId;
+ Awaitility.await().untilAsserted(() -> {
+ PositionImpl position = (PositionImpl) subscription.getCursor().getMarkDeletedPosition();
+ assertEquals(position.getLedgerId(), finalLastMsgId.getLedgerId());
+ assertEquals(position.getEntryId(), finalLastMsgId.getEntryId());
+ });
+ consumer.close();
+
+ Map<String, String> map = new HashMap<>();
+ map.put("1","1");
+ map.put("2","2");
+ consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map)
+ .subscriptionName(subName).subscribe();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().property(String.valueOf(i), String.valueOf(i)).value("1").send();
+ }
+ counter = 0;
+ while (true) {
+ Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+ if (message != null) {
+ counter++;
+ consumer.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+ assertEquals(2, counter);
+
+ producer.close();
+ consumer.close();
+
+ BrokerService brokerService = pulsar.getBrokerService();
+ Field field1 = BrokerService.class.getDeclaredField("entryFilters");
+ field1.setAccessible(true);
+ field1.set(brokerService, ImmutableMap.of("1", loader1, "2", loader2));
+ cleanup();
+ verify(loader1, times(1)).close();
+ verify(loader2, times(1)).close();
+
+ }
+
+}