You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/26 06:46:51 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #12970: [PIP-105] Part-2 Support pluggable entry filter in Dispatcher

eolivelli commented on a change in pull request #12970:
URL: https://github.com/apache/pulsar/pull/12970#discussion_r757260819



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {

Review comment:
       This method is only for the implementation, do not keep it here

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {

Review comment:
       Here you should only declare an
    interface.
   Move the implementation somewhere else

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {
+            batchSizes = null;
+            sendMessageInfo = null;
+            indexesAcks = null;
+            cursor = null;
+            isReplayRead = false;
+            subscription = null;
+            subscriptionOption = null;
+            msgMetadata = null;
+        }
+    }
+
+    enum FilterResult {
+        /**
+         * deliver to the consumer.
+         */
+        ACCEPT,
+        /**
+         * skip the message.
+         */
+        REJECT,
+    }
+
+    @Data
+    class EntryFilterDefinitions {
+        private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
+    }
+
+    @Data
+    @NoArgsConstructor
+    class EntryFilterMetaData {

Review comment:
       Move this to the implementation, do not expose this it in the public interface

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {
+            batchSizes = null;
+            sendMessageInfo = null;
+            indexesAcks = null;
+            cursor = null;
+            isReplayRead = false;
+            subscription = null;
+            subscriptionOption = null;
+            msgMetadata = null;
+        }
+    }
+
+    enum FilterResult {
+        /**
+         * deliver to the consumer.
+         */
+        ACCEPT,
+        /**
+         * skip the message.
+         */
+        REJECT,
+    }
+
+    @Data
+    class EntryFilterDefinitions {
+        private final Map<String, EntryFilterMetaData> filters = new TreeMap<>();
+    }
+
+    @Data
+    @NoArgsConstructor
+    class EntryFilterMetaData {
+
+        /**
+         * The definition of the broker interceptor.
+         */
+        private EntryFilterDefinition definition;
+
+        /**
+         * The path to the handler package.
+         */
+        private Path archivePath;
+    }
+
+    @Data
+    @NoArgsConstructor
+    class EntryFilterDefinition {

Review comment:
       Move this to the implementation, do not expose this it in the public interface

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilter.java
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.TreeMap;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+public interface EntryFilter {
+
+    /**
+     * Broker determines whether to filter out this entry based on the return value of this method.
+     * Do not deserialize the entire entry in this method,
+     * which has a great impact on the broker's memory and CPU.
+     * @param entry
+     * @param context
+     * @return
+     */
+    FilterResult filterEntry(Entry entry, FilterContext context);
+
+    @Data
+    class FilterContext {
+
+        private EntryBatchSizes batchSizes;
+        private SendMessageInfo sendMessageInfo;
+        private EntryBatchIndexesAcks indexesAcks;
+        private ManagedCursor cursor;
+        private boolean isReplayRead;
+        private Subscription subscription;
+        private SubscriptionOption subscriptionOption;
+        private MessageMetadata msgMetadata;
+
+        public void reset() {
+            batchSizes = null;
+            sendMessageInfo = null;
+            indexesAcks = null;
+            cursor = null;
+            isReplayRead = false;
+            subscription = null;
+            subscriptionOption = null;
+            msgMetadata = null;
+        }
+    }
+
+    enum FilterResult {
+        /**
+         * deliver to the consumer.
+         */
+        ACCEPT,
+        /**
+         * skip the message.
+         */
+        REJECT,
+    }
+
+    @Data
+    class EntryFilterDefinitions {

Review comment:
       Move this to the implementation, do not expose this it in the public interface

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterForTest2.java
##########
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pulsar.broker.service.EntryFilter;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.api.proto.KeyValue;
+
+public class EntryFilterForTest2 implements EntryFilter {

Review comment:
       I am not sure that maven selection this class as the name does not end with Test.
   Please rename 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org