You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/29 11:13:58 UTC

[GitHub] [pulsar] eolivelli opened a new pull request, #15391: DRAFT - [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

eolivelli opened a new pull request, #15391:
URL: https://github.com/apache/pulsar/pull/15391

   ### Motivation
   
   In some cases a Consumer will reject the processing of a message, and so negative acknowledge it.
   With this change you can emulate this behaviour and save resources by not sending the Entry to the Consumer, but anticipating the "negative acknowledge".
   
   ### Modifications
   - Allow the EntryFilter to access the Consumer who is going to receive the message (add Consumer to FilterContext)
   - Add a new FilterResult RESCHEDULE that means that the Filter wills to schedule the message to "another consumer" or to post pone the processing of the message
    
   ### Verifying this change
   
   TODO: add tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#discussion_r867356798


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -147,13 +149,20 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     : msgMetadata;
+            EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
             if (CollectionUtils.isNotEmpty(entryFilters)) {
-                fillContext(filterContext, msgMetadata, subscription);
-                if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
+                fillContext(filterContext, msgMetadata, subscription, consumer);
+                filterResult = getFilterResult(filterContext, entry, entryFilters);
+                if (filterResult == EntryFilter.FilterResult.REJECT) {
                     entriesToFiltered.add(entry.getPosition());
                     entries.set(i, null);
                     entry.release();
                     continue;
+                } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
+                    entriesToRedeliver.add((PositionImpl) entry.getPosition());
+                    entries.set(i, null);
+                    entry.release();
+                    continue;

Review Comment:
   This won't happen, because for each round we process the Entry only once, and it will be put in the set of messages to be redelivered. So we are not entering an infinite loop.
   
   At the next round the Dispatcher will choose a Consumer (it may choose the same, if you configure priority for instance).
   
   For the read entry requests to the bookies. Shouldn't we keep reading the entry from the local cache ?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -147,13 +149,20 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     : msgMetadata;
+            EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
             if (CollectionUtils.isNotEmpty(entryFilters)) {
-                fillContext(filterContext, msgMetadata, subscription);
-                if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
+                fillContext(filterContext, msgMetadata, subscription, consumer);
+                filterResult = getFilterResult(filterContext, entry, entryFilters);
+                if (filterResult == EntryFilter.FilterResult.REJECT) {
                     entriesToFiltered.add(entry.getPosition());
                     entries.set(i, null);
                     entry.release();
                     continue;
+                } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
+                    entriesToRedeliver.add((PositionImpl) entry.getPosition());
+                    entries.set(i, null);
+                    entry.release();
+                    continue;

Review Comment:
   This won't happen, because for each round we process the Entry only once, and it will be put in the set of messages to be redelivered. So we are not entering an infinite loop.
   
   At the next round the Dispatcher will choose a Consumer (it may choose the same, if you configure priority for instance).
   
   For the read entry requests to the bookies: shouldn't we keep reading the entry from the local cache ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#issuecomment-1117101549

   @hangc0276 I have added tests PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#discussion_r867850751


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -227,6 +236,15 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                 ((AbstractTopic) topic).addFilteredEntriesCount(filtered);
             }
         }
+        if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
+            this.subscription.getTopic().getBrokerService().getPulsar().getExecutor()
+                    .schedule(() -> {
+                        // simulate the Consumer rejected the message
+                        subscription
+                                .redeliverUnacknowledgedMessages(consumer, entriesToRedeliver);
+                    }, 1, TimeUnit.SECONDS);
+
+        }

Review Comment:
   But for a single active consumer, it will rewind the cursor multiple times. It's not like Shared subscription(only redeliver the given message)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #15391: DRAFT - [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#issuecomment-1113185488

   @hangc0276 this is only a draft, I am adding tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#issuecomment-1118523844

   @codelipenghui I have sent the message to dev@pulsar. Thanks
   
   I also have updated the patch
   PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#discussion_r867356547


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -147,13 +149,20 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     : msgMetadata;
+            EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
             if (CollectionUtils.isNotEmpty(entryFilters)) {
-                fillContext(filterContext, msgMetadata, subscription);
-                if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
+                fillContext(filterContext, msgMetadata, subscription, consumer);
+                filterResult = getFilterResult(filterContext, entry, entryFilters);
+                if (filterResult == EntryFilter.FilterResult.REJECT) {
                     entriesToFiltered.add(entry.getPosition());
                     entries.set(i, null);
                     entry.release();
                     continue;
+                } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
+                    entriesToRedeliver.add((PositionImpl) entry.getPosition());
+                    entries.set(i, null);
+                    entry.release();
+                    continue;

Review Comment:
   If we have 3 consumers, all of them will return `RESCHEDULE` here,
   
   consumer 0 -> consumer 1
   consumer 1 -> consumer 2
   
   does consumer 2 will trigger to redeliver to consumer 0 again? If accumulate a lot of such messages, will introduce many read entry requests to bookies, and can't stop.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#discussion_r867357016


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -227,6 +236,15 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                 ((AbstractTopic) topic).addFilteredEntriesCount(filtered);
             }
         }
+        if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
+            this.subscription.getTopic().getBrokerService().getPulsar().getExecutor()
+                    .schedule(() -> {
+                        // simulate the Consumer rejected the message
+                        subscription
+                                .redeliverUnacknowledgedMessages(consumer, entriesToRedeliver);
+                    }, 1, TimeUnit.SECONDS);
+
+        }

Review Comment:
   with "Single active consume" the outcome it the same, in that case we are deferring the processing of the message (like when the consumer negatively acks the message on the client because the message cannot currently be processed due to some reason).
   
   Unfortunately the EntryFilter cannot return a value for the delay period without breaking the API (we only have a enum)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15391: DRAFT - [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#issuecomment-1113185213

   @eolivelli:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#discussion_r867858865


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -147,13 +149,20 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     : msgMetadata;
+            EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
             if (CollectionUtils.isNotEmpty(entryFilters)) {
-                fillContext(filterContext, msgMetadata, subscription);
-                if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
+                fillContext(filterContext, msgMetadata, subscription, consumer);
+                filterResult = getFilterResult(filterContext, entry, entryFilters);
+                if (filterResult == EntryFilter.FilterResult.REJECT) {
                     entriesToFiltered.add(entry.getPosition());
                     entries.set(i, null);
                     entry.release();
                     continue;
+                } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
+                    entriesToRedeliver.add((PositionImpl) entry.getPosition());
+                    entries.set(i, null);
+                    entry.release();
+                    continue;

Review Comment:
   > This won't happen, because for each round we process the Entry only once, and it will be put in the set of 
   messages to be redelivered. So we are not entering an infinite loop.
   
   Yes, it's not an infinite loop for one dispatch round, but the message will be redelivered again and again until there is a consumer who can accept the message right? Just to make sure I understand it correctly.
   
   > For the read entry requests to the bookies: shouldn't we keep reading the entry from the local cache ?
   
   We don't have a catchup read cache for now. It's a different story, the client-side redelivery might also read the entry from the bookies, so it's not a blocker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#discussion_r867356129


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -227,6 +236,15 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                 ((AbstractTopic) topic).addFilteredEntriesCount(filtered);
             }
         }
+        if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
+            this.subscription.getTopic().getBrokerService().getPulsar().getExecutor()
+                    .schedule(() -> {
+                        // simulate the Consumer rejected the message
+                        subscription
+                                .redeliverUnacknowledgedMessages(consumer, entriesToRedeliver);
+                    }, 1, TimeUnit.SECONDS);
+
+        }

Review Comment:
   Need to check the subscription type here? The Single active consumer subscription mode will perform a different behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#issuecomment-1118489873

   Just have a Slack conversation with @eolivelli,
   
   It's an API change for the EntryFilter(have a new result type) which is not mentioned in PIP-105, 
   We should have a discussion in the dev email list to make sure the understanding of the definition of the new behavior is consistent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15391:
URL: https://github.com/apache/pulsar/pull/15391#discussion_r867858865


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -147,13 +149,20 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     : msgMetadata;
+            EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
             if (CollectionUtils.isNotEmpty(entryFilters)) {
-                fillContext(filterContext, msgMetadata, subscription);
-                if (EntryFilter.FilterResult.REJECT == getFilterResult(filterContext, entry, entryFilters)) {
+                fillContext(filterContext, msgMetadata, subscription, consumer);
+                filterResult = getFilterResult(filterContext, entry, entryFilters);
+                if (filterResult == EntryFilter.FilterResult.REJECT) {
                     entriesToFiltered.add(entry.getPosition());
                     entries.set(i, null);
                     entry.release();
                     continue;
+                } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
+                    entriesToRedeliver.add((PositionImpl) entry.getPosition());
+                    entries.set(i, null);
+                    entry.release();
+                    continue;

Review Comment:
   > This won't happen, because for each round we process the Entry only once, and it will be put in the set of 
   messages to be redelivered. So we are not entering an infinite loop.
   
   Yes, it's not an infinite loop for one dispatch round, but the message will be redelivered again and again until there is a consumer who can accept the message right? Just to make sure I understand it correctly.
   
   Why I asking this question is, after we introduce `RESCHEDULE `, how can we determine the message should be filtered out or redelivery, If you have an example for your JMS integration, maybe can help me to understand how to use them together.
   
   > For the read entry requests to the bookies: shouldn't we keep reading the entry from the local cache ?
   
   We don't have a catchup read cache for now. It's a different story, the client-side redelivery might also read the entry from the bookies, so it's not a blocker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli merged pull request #15391: [enh] [broker] EntryFilter (PIP-105) - support per-Consumer filtering

Posted by GitBox <gi...@apache.org>.
eolivelli merged PR #15391:
URL: https://github.com/apache/pulsar/pull/15391


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org