You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Enrico Olivelli <eo...@gmail.com> on 2022/07/14 07:34:08 UTC

PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Hello,
this is a PIP to implement a tool to analyse the subscription backlog

Link: https://github.com/apache/pulsar/issues/16597
Prototype: https://github.com/apache/pulsar/pull/16545

Below you can find the proposal (I will amend the GH issue while we
discuss, as usual)

Enrico

Motivation

Currently there is no way to have a accurate backlog for a subscription:

you have only the number of "entries", not messages
server side filters (PIP-105) may filter out some messages

Having the number of entries is sometimes not enough because with
batch messages the amount of work on the Consumers is proportional to
the number of messages, that may vary from entry to entry.

Goal

The idea of this patch is to provide a dedicate API (REST,
pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
provide detailed information about that is expected to be delivered to
Consumers.

The operation will be quite expensive because we have to load the
messages from storage and pass them to the filters, but due to the
dynamic nature of Pulsar subscriptions there is no other way to have
this value.

One good strategy to do monitoring/alerting is to setup alerts on the
usual "stats" and use this new API to inspect the subscription deeper,
typically be issuing a manual command.

API Changes

internal ManagedCursor API:

CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
maxEntries, long timeOutMs);

This method scans the Cursor from the lastMarkDelete position to the tail.
There is a time limit and a maxEntries limit, these are needed in
order to prevent huge (and useless) scans.
The Predicate can stop the scan, if it doesn't want to continue the
processing for some reasons.

New REST API:

    @GET
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
    @ApiOperation(value = "Analyse a subscription, by scanning all the
unprocessed messages")

    public void analiseSubscriptionBacklog(
           @Suspended final AsyncResponse asyncResponse,
            @ApiParam(value = "Specify the tenant", required = true)
            @PathParam("tenant") String tenant,
            @ApiParam(value = "Specify the namespace", required = true)
            @PathParam("namespace") String namespace,
            @ApiParam(value = "Specify topic name", required = true)
            @PathParam("topic") @Encoded String encodedTopic,
            @ApiParam(value = "Subscription", required = true)
            @PathParam("subName") String encodedSubName,
            @ApiParam(value = "Is authentication required to perform
this operation")
            @QueryParam("authoritative") @DefaultValue("false")
boolean authoritative) {

API response model:

public class AnaliseSubscriptionBacklogResult {
    private long entries;
    private long messages;

    private long filterRejectedEntries;
    private long filterAcceptedEntries;
    private long filterRescheduledEntries;

    private long filterRejectedMessages;
    private long filterAcceptedMessages;
    private long filterRescheduledMessages;

    private boolean aborted;

The response contains "aborted=true" is the request has been aborted
by some internal limitations, like a timeout or the scan hit the max
number of entries.
We are not going to provide more details about the reason of the stop.
It will make the API too detailed and harder to maintain. Also, in the
logs of the broker you will find the details.

New PulsarAdmin API:

/**
     * Analise subscription backlog.
     * This is a potentially expensive operation, as it requires
     * to read the messages from storage.
     * This function takes into consideration batch messages
     * and also Subscription filters.
     * @param topic
     *            Topic name
     * @param subscriptionName
     *            the subscription
     * @return an accurate analysis of the backlog
     * @throws PulsarAdminException
     *            Unexpected error
     */
    AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
topic, String subscriptionName)
            throws PulsarAdminException;

    /**
     * Analise subscription backlog.
     * This is a potentially expensive operation, as it requires
     * to read the messages from storage.
     * This function takes into consideration batch messages
     * and also Subscription filters.
     * @param topic
     *            Topic name
     * @param subscriptionName
     *            the subscription
     * @return an accurate analysis of the backlog
     * @throws PulsarAdminException
     *            Unexpected error
     */
    CompletableFuture<AnaliseSubscriptionBacklogResult>
analiseSubscriptionBacklogAsync(String topic,

                 String subscriptionName);

A pulsar-admin command will be added as well as usual.

New configuration entries in broker.conf:

@FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Maximum time to spend while scanning a subscription to
calculate the accurate backlog"
 )
 private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
 @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Maximum number of entries to process while scanning a
subscription to calculate the accurate backlog"
 )
 private long subscriptionBacklogScanMaxEntries = 10_000;

Implementation

The implementation is pretty straightforward:

add a new API in ManagedCursor to do the Scan
add the REST API
implement in PersistentSubscription a analiseBacklog method that does the scan

The the PersistentSubscription runs the scan:

it applies the filters if they are present
it considers individuallyDeletedMessages

Non trivial problem regarding the Dispatcher:
The Filters are loaded by a AbstractBaseDispatcher, but
PersistentSubscription starts a Dispatcher only when the first
consumer is connecter.
This happens because the Subscription itself doesn't have a type
(Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
is decided by the first consumer coming (after the load of the topic,
so the subscription type may change after a topic unload).
This PIP won't fix this "problem", and so in case of missing
Dispatcher we are going to use a ephemeral Dispatcher without Type.
Maybe in the future it will be better to persist the Subscription Type
and other metadata, this way we can create the Dispatcher while
instantiating the Subscription.

Reject Alternatives

We could store somehow some counter about the number of logical
messages during writes. But that does not work for a few reasons:

you cannot know which subscriptions will be created in a topic
subscription can be created from the past (Earliest)
subscription filters may change over time: they are usually configured
using Subscription Properties, and those properties are dynamic
doing computations on the write path (like running filters) kills
latency and thoughtput

Use a client to clone the subscription and consume data.
This doesn't work because you have to transfer the data to the client,
and this is possibly a huge amount of work and a waste of resources.

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Asaf Mesika <as...@gmail.com>.
On Thu, Jul 21, 2022 at 10:36 AM Enrico Olivelli <eo...@gmail.com>
wrote:

> Il giorno gio 21 lug 2022 alle ore 06:25 PengHui Li
> <pe...@apache.org> ha scritto:
> >
> > > What if the topic owner creates an internal subscription, consumes the
> > messages, and updates a count per filter.
> >
> > I agree with this approach. If we need to scan all the backlogs to
> > calculate the
> > accurate backlogs for each operation, it's so expensive and difficult to
> > apply to
> > the production environment. With the counter for each
> filter(subscription)
> > and only
> > re-scan the data after the filter changes will reduce a lot of overhead.
>
> This approach is very expensive when you don't need this feature.
> Because in order to have this value you have to read everything (once
> per subscription),
> especially when you have a subscription without consumers and the
> topic is being written.
>
> As you can't know when you will need this value, you are going to
> waste resources in order to calculate
> something that you won't use.
>
>
> This API to analyze a subscription is not meant to be used often, but
> only to debug (manually) problems.
>

I would emphasis that last sentence in the PIP, and epsecially in the
documenation, to clarify that this operation is very expensive
(pehaps capital letters) and should be used mostly manual debugging and not
for actual monitoring.
One option to make it unmissed: "analyzeBacklog" --> "
fullScanBacklogAnalysis" or "AnalyzeBacklogUsingFullScan"


>
> Also, the filter may be dependent on some other environment variables,
> like the wall clock time,
> if you have a filter that depends on time and you pre-calculate the
> backlog your counter won't be correct.
>
I wasn't aware of that - I re-read PIP-105 and then realized the filter has
complete freedom. The filter I had in mind was more constrained in nature
(like where clauss in SQL)


>
> >
> > If we want to expose the accurate backlogs in the Prometheus endpoint,
> > it's almost impossible.
>
> I don't think this is actually possible if you want to take into
> consideration the filters.
> We are in the case of general purpose filtering (actually we allow
> Java code to be plugged into the browser),
> so pre-calculating the counters won't work well.
>
>
> Enrico
>
> >
> > Thanks,
> > Penghui
> >
> > On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika <as...@gmail.com>
> wrote:
> >
> > > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli <eo...@gmail.com>
> > > wrote:
> > >
> > > > Asaf,
> > > >
> > > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> > > > <as...@gmail.com> ha scritto:
> > > > >
> > > > > I'm not sure I understand the context exactly:
> > > > >
> > > > > You say today we can only know the number of entries, hence we'll
> have
> > > a
> > > > > wrong number of backlog for subscription since:
> > > > > 1. One entry contains multiple messages (batch message)
> > > > > 2. Subscription may contain a filter, which requires you to read
> the
> > > > entire
> > > > > backlog to know it
> > > >
> > > > correct
> > > >
> > > > >
> > > > > There are two things I don't understand:
> > > > >
> > > > > 1. We're adding an observability API, which you need to pay all the
> > > read
> > > > > cost just to know the count. I presume people would want to run
> this
> > > more
> > > > > than once. So they will read same data multiple times - why would a
> > > user
> > > > be
> > > > > willing to pay such a hefty price?
> > > >
> > > > sometimes it is the case, because processing a message may have a
> high
> > > > cost.
> > > > So having 10 entries of 100 messages is not correctly representing
> the
> > > > amount of work that must be done by the consumers
> > > > and so the user may wish to have an exact count.
> > > >
> > > > Having the filters adds more complexity because you cannot predict
> how
> > > > many entries will be filtered out
> > > >
> > > >
> > > > So it's mainly serving that specific use case of reading the entire
> > > messages over and over (every interval) is an order of magnitude less
> > > expensive than the processing it self.
> > >
> > >
> > > > > 2. If the user needs to know an accurate backlog, can't they use
> the
> > > > > ability to create a very large number of topics, thus they will
> know an
> > > > > accurate backlog without the huge cost?
> > > >
> > > > I can't understand why creating many topics will help.
> > > > instead with filters it is very likely that you have only fewer
> topics
> > > > with many subscriptions with different filters
> > > >
> > > > as you don't know the filters while writing you cannot route the
> > > > messages to some topic
> > > > also you would need to write the message to potentially multiple
> > > > topics, and that would be a huge write amplification
> > > > (think about a topic with 100 subscriptions)
> > > >
> > > > Yes, I haven't thought about that.
> > > What I was thinking is that those filters are mutually exclusive
> therefor
> > > topics, but in your case, if you have 100 different filters, and they
> > > overlap, yes it would be way more expensive to write them 100 times.
> > >
> > > >
> > > > > I have an idea, if that's ok:
> > > > >
> > > > > What if you can keep, as you said in your document, a metric
> counting
> > > > > messages per filter upon write.
> > > > This is not possible as described above
> > > >
> > >
> > > You wrote above that:
> > >
> > > ---
> > > you cannot know which subscriptions will be created in a topic
> > > subscription can be created from the past (Earliest)
> > > subscription filters may change over time: they are usually configured
> > > using Subscription Properties, and those properties are dynamic
> > > doing computations on the write path (like running filters) kills
> > > latency and thoughtput
> > >
> > > Use a client to clone the subscription and consume data.
> > > This doesn't work because you have to transfer the data to the client,
> > > and this is possibly a huge amount of work and a waste of resources.
> > > ---
> > >
> > > What if we don't do it directly on the write path.
> > > What if the topic owner creates an internal subscription, consumes the
> > > messages, and updates a count per filter.
> > > Thus, those computation will have less effect directly on the write
> path.
> > >
> > > I'm trying to compare that cost of compuations, with consuming all the
> > > messages, again and again, running filter computation for them, every
> > > interval (say 1min).
> > > The amount of computation in the latter would be more costly, no?
> > >
> > >
> > > > When you update the filter / add a filter
> > > > > by adding a new subscription, you can run code that reads from the
> > > > > beginning of the subscription (first unacked message) to catch up
> and
> > > > then
> > > > > continues. This may be done async, so the metric will take some
> time to
> > > > > catch up.
> > > > > Amortized, it has less cost on the system overall, if compared to
> > > reading
> > > > > all the messages multiple times to get a period size of the
> > > subscription.
> > > > > Both solutions are expensive as opposed to nothing of course. Both
> has
> > > to
> > > > > be a well documented conscious choice.
> > > > > WDYT?
> > > >
> > > >
> > > > Enrico
> > > > >
> > > > > Asaf
> > > > >
> > > > >
> > > > > On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <
> eolivelli@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > > this is a PIP to implement a tool to analyse the subscription
> backlog
> > > > > >
> > > > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > > > >
> > > > > > Below you can find the proposal (I will amend the GH issue while
> we
> > > > > > discuss, as usual)
> > > > > >
> > > > > > Enrico
> > > > > >
> > > > > > Motivation
> > > > > >
> > > > > > Currently there is no way to have a accurate backlog for a
> > > > subscription:
> > > > > >
> > > > > > you have only the number of "entries", not messages
> > > > > > server side filters (PIP-105) may filter out some messages
> > > > > >
> > > > > > Having the number of entries is sometimes not enough because with
> > > > > > batch messages the amount of work on the Consumers is
> proportional to
> > > > > > the number of messages, that may vary from entry to entry.
> > > > > >
> > > > > > Goal
> > > > > >
> > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription
> and
> > > > > > provide detailed information about that is expected to be
> delivered
> > > to
> > > > > > Consumers.
> > > > > >
> > > > > > The operation will be quite expensive because we have to load the
> > > > > > messages from storage and pass them to the filters, but due to
> the
> > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> have
> > > > > > this value.
> > > > > >
> > > > > > One good strategy to do monitoring/alerting is to setup alerts
> on the
> > > > > > usual "stats" and use this new API to inspect the subscription
> > > deeper,
> > > > > > typically be issuing a manual command.
> > > > > >
> > > > > > API Changes
> > > > > >
> > > > > > internal ManagedCursor API:
> > > > > >
> > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> long
> > > > > > maxEntries, long timeOutMs);
> > > > > >
> > > > > > This method scans the Cursor from the lastMarkDelete position to
> the
> > > > tail.
> > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > order to prevent huge (and useless) scans.
> > > > > > The Predicate can stop the scan, if it doesn't want to continue
> the
> > > > > > processing for some reasons.
> > > > > >
> > > > > > New REST API:
> > > > > >
> > > > > >     @GET
> > > > > >
> > > > > >
> > > >
> > >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > > > >     @ApiOperation(value = "Analyse a subscription, by scanning
> all
> > > the
> > > > > > unprocessed messages")
> > > > > >
> > > > > >     public void analiseSubscriptionBacklog(
> > > > > >            @Suspended final AsyncResponse asyncResponse,
> > > > > >             @ApiParam(value = "Specify the tenant", required =
> true)
> > > > > >             @PathParam("tenant") String tenant,
> > > > > >             @ApiParam(value = "Specify the namespace", required =
> > > true)
> > > > > >             @PathParam("namespace") String namespace,
> > > > > >             @ApiParam(value = "Specify topic name", required =
> true)
> > > > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > > > >             @ApiParam(value = "Subscription", required = true)
> > > > > >             @PathParam("subName") String encodedSubName,
> > > > > >             @ApiParam(value = "Is authentication required to
> perform
> > > > > > this operation")
> > > > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > > > boolean authoritative) {
> > > > > >
> > > > > > API response model:
> > > > > >
> > > > > > public class AnaliseSubscriptionBacklogResult {
> > > > > >     private long entries;
> > > > > >     private long messages;
> > > > > >
> > > > > >     private long filterRejectedEntries;
> > > > > >     private long filterAcceptedEntries;
> > > > > >     private long filterRescheduledEntries;
> > > > > >
> > > > > >     private long filterRejectedMessages;
> > > > > >     private long filterAcceptedMessages;
> > > > > >     private long filterRescheduledMessages;
> > > > > >
> > > > > >     private boolean aborted;
> > > > > >
> > > > > > The response contains "aborted=true" is the request has been
> aborted
> > > > > > by some internal limitations, like a timeout or the scan hit the
> max
> > > > > > number of entries.
> > > > > > We are not going to provide more details about the reason of the
> > > stop.
> > > > > > It will make the API too detailed and harder to maintain. Also,
> in
> > > the
> > > > > > logs of the broker you will find the details.
> > > > > >
> > > > > > New PulsarAdmin API:
> > > > > >
> > > > > > /**
> > > > > >      * Analise subscription backlog.
> > > > > >      * This is a potentially expensive operation, as it requires
> > > > > >      * to read the messages from storage.
> > > > > >      * This function takes into consideration batch messages
> > > > > >      * and also Subscription filters.
> > > > > >      * @param topic
> > > > > >      *            Topic name
> > > > > >      * @param subscriptionName
> > > > > >      *            the subscription
> > > > > >      * @return an accurate analysis of the backlog
> > > > > >      * @throws PulsarAdminException
> > > > > >      *            Unexpected error
> > > > > >      */
> > > > > >     AnaliseSubscriptionBacklogResult
> > > analiseSubscriptionBacklog(String
> > > > > > topic, String subscriptionName)
> > > > > >             throws PulsarAdminException;
> > > > > >
> > > > > >     /**
> > > > > >      * Analise subscription backlog.
> > > > > >      * This is a potentially expensive operation, as it requires
> > > > > >      * to read the messages from storage.
> > > > > >      * This function takes into consideration batch messages
> > > > > >      * and also Subscription filters.
> > > > > >      * @param topic
> > > > > >      *            Topic name
> > > > > >      * @param subscriptionName
> > > > > >      *            the subscription
> > > > > >      * @return an accurate analysis of the backlog
> > > > > >      * @throws PulsarAdminException
> > > > > >      *            Unexpected error
> > > > > >      */
> > > > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > >
> > > > > >                  String subscriptionName);
> > > > > >
> > > > > > A pulsar-admin command will be added as well as usual.
> > > > > >
> > > > > > New configuration entries in broker.conf:
> > > > > >
> > > > > > @FieldContext(
> > > > > >          category = CATEGORY_POLICIES,
> > > > > >          doc = "Maximum time to spend while scanning a
> subscription
> > > to
> > > > > > calculate the accurate backlog"
> > > > > >  )
> > > > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > >  @FieldContext(
> > > > > >          category = CATEGORY_POLICIES,
> > > > > >          doc = "Maximum number of entries to process while
> scanning a
> > > > > > subscription to calculate the accurate backlog"
> > > > > >  )
> > > > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > >
> > > > > > Implementation
> > > > > >
> > > > > > The implementation is pretty straightforward:
> > > > > >
> > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > add the REST API
> > > > > > implement in PersistentSubscription a analiseBacklog method that
> does
> > > > the
> > > > > > scan
> > > > > >
> > > > > > The the PersistentSubscription runs the scan:
> > > > > >
> > > > > > it applies the filters if they are present
> > > > > > it considers individuallyDeletedMessages
> > > > > >
> > > > > > Non trivial problem regarding the Dispatcher:
> > > > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > > > PersistentSubscription starts a Dispatcher only when the first
> > > > > > consumer is connecter.
> > > > > > This happens because the Subscription itself doesn't have a type
> > > > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this
> stuff
> > > > > > is decided by the first consumer coming (after the load of the
> topic,
> > > > > > so the subscription type may change after a topic unload).
> > > > > > This PIP won't fix this "problem", and so in case of missing
> > > > > > Dispatcher we are going to use a ephemeral Dispatcher without
> Type.
> > > > > > Maybe in the future it will be better to persist the Subscription
> > > Type
> > > > > > and other metadata, this way we can create the Dispatcher while
> > > > > > instantiating the Subscription.
> > > > > >
> > > > > > Reject Alternatives
> > > > > >
> > > > > > We could store somehow some counter about the number of logical
> > > > > > messages during writes. But that does not work for a few reasons:
> > > > > >
> > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > subscription can be created from the past (Earliest)
> > > > > > subscription filters may change over time: they are usually
> > > configured
> > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > doing computations on the write path (like running filters) kills
> > > > > > latency and thoughtput
> > > > > >
> > > > > > Use a client to clone the subscription and consume data.
> > > > > > This doesn't work because you have to transfer the data to the
> > > client,
> > > > > > and this is possibly a huge amount of work and a waste of
> resources.
> > > > > >
> > > >
> > >
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Enrico Olivelli <eo...@gmail.com>.
Il giorno ven 22 lug 2022 alle ore 11:16 PengHui Li
<pe...@apache.org> ha scritto:
>
> > As you can't know when you will need this value, you are going to
> waste resources in order to calculate
> something that you won't use.
>
> > This API to analyze a subscription is not meant to be used often, but
> only to debug (manually) problems.
>
> Oh, I got your point.
>
> One concern is if we are using REST API to get the accurate value,
> not sure if all the messages are filtered out within the request timeout
> or the max scan entries.
>
> I'm thinking can we add a start message ID for the request? It looks like we
> can continue to get the backlogs based on the last scanned message ID.
>
> For example:
>
>  - get backlog from 1:1
>  - received 1 backlog and the last scanned message ID is 1:50000, which
> does not reach the end of topic
>  - get the backlog from 1:50001
>  - received 2 backlogs and the last scanned message ID 3:40000, which does
> not reach the end of topic
>  - get the backlog from 3:40000
>  - received 5 backlogs and reached the end of topic
>
> The accurate backlog for this subscription is 8.

Makes sense to me.

I will add an optional starting position, if it is not provided we
will scan from the lastDeletedMarkPosition.

The result of the scan will return the first and the last positions

Let me update the PIP

Enrico


>
> Thanks,
> Penghui
> On Thu, Jul 21, 2022 at 3:36 PM Enrico Olivelli <eo...@gmail.com> wrote:
>
> > Il giorno gio 21 lug 2022 alle ore 06:25 PengHui Li
> > <pe...@apache.org> ha scritto:
> > >
> > > > What if the topic owner creates an internal subscription, consumes the
> > > messages, and updates a count per filter.
> > >
> > > I agree with this approach. If we need to scan all the backlogs to
> > > calculate the
> > > accurate backlogs for each operation, it's so expensive and difficult to
> > > apply to
> > > the production environment. With the counter for each
> > filter(subscription)
> > > and only
> > > re-scan the data after the filter changes will reduce a lot of overhead.
> >
> > This approach is very expensive when you don't need this feature.
> > Because in order to have this value you have to read everything (once
> > per subscription),
> > especially when you have a subscription without consumers and the
> > topic is being written.
> >
> > As you can't know when you will need this value, you are going to
> > waste resources in order to calculate
> > something that you won't use.
> >
> >
> > This API to analyze a subscription is not meant to be used often, but
> > only to debug (manually) problems.
> >
> > Also, the filter may be dependent on some other environment variables,
> > like the wall clock time,
> > if you have a filter that depends on time and you pre-calculate the
> > backlog your counter won't be correct.
> >
> > >
> > > If we want to expose the accurate backlogs in the Prometheus endpoint,
> > > it's almost impossible.
> >
> > I don't think this is actually possible if you want to take into
> > consideration the filters.
> > We are in the case of general purpose filtering (actually we allow
> > Java code to be plugged into the browser),
> > so pre-calculating the counters won't work well.
> >
> >
> > Enrico
> >
> > >
> > > Thanks,
> > > Penghui
> > >
> > > On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika <as...@gmail.com>
> > wrote:
> > >
> > > > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli <eo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Asaf,
> > > > >
> > > > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> > > > > <as...@gmail.com> ha scritto:
> > > > > >
> > > > > > I'm not sure I understand the context exactly:
> > > > > >
> > > > > > You say today we can only know the number of entries, hence we'll
> > have
> > > > a
> > > > > > wrong number of backlog for subscription since:
> > > > > > 1. One entry contains multiple messages (batch message)
> > > > > > 2. Subscription may contain a filter, which requires you to read
> > the
> > > > > entire
> > > > > > backlog to know it
> > > > >
> > > > > correct
> > > > >
> > > > > >
> > > > > > There are two things I don't understand:
> > > > > >
> > > > > > 1. We're adding an observability API, which you need to pay all the
> > > > read
> > > > > > cost just to know the count. I presume people would want to run
> > this
> > > > more
> > > > > > than once. So they will read same data multiple times - why would a
> > > > user
> > > > > be
> > > > > > willing to pay such a hefty price?
> > > > >
> > > > > sometimes it is the case, because processing a message may have a
> > high
> > > > > cost.
> > > > > So having 10 entries of 100 messages is not correctly representing
> > the
> > > > > amount of work that must be done by the consumers
> > > > > and so the user may wish to have an exact count.
> > > > >
> > > > > Having the filters adds more complexity because you cannot predict
> > how
> > > > > many entries will be filtered out
> > > > >
> > > > >
> > > > > So it's mainly serving that specific use case of reading the entire
> > > > messages over and over (every interval) is an order of magnitude less
> > > > expensive than the processing it self.
> > > >
> > > >
> > > > > > 2. If the user needs to know an accurate backlog, can't they use
> > the
> > > > > > ability to create a very large number of topics, thus they will
> > know an
> > > > > > accurate backlog without the huge cost?
> > > > >
> > > > > I can't understand why creating many topics will help.
> > > > > instead with filters it is very likely that you have only fewer
> > topics
> > > > > with many subscriptions with different filters
> > > > >
> > > > > as you don't know the filters while writing you cannot route the
> > > > > messages to some topic
> > > > > also you would need to write the message to potentially multiple
> > > > > topics, and that would be a huge write amplification
> > > > > (think about a topic with 100 subscriptions)
> > > > >
> > > > > Yes, I haven't thought about that.
> > > > What I was thinking is that those filters are mutually exclusive
> > therefor
> > > > topics, but in your case, if you have 100 different filters, and they
> > > > overlap, yes it would be way more expensive to write them 100 times.
> > > >
> > > > >
> > > > > > I have an idea, if that's ok:
> > > > > >
> > > > > > What if you can keep, as you said in your document, a metric
> > counting
> > > > > > messages per filter upon write.
> > > > > This is not possible as described above
> > > > >
> > > >
> > > > You wrote above that:
> > > >
> > > > ---
> > > > you cannot know which subscriptions will be created in a topic
> > > > subscription can be created from the past (Earliest)
> > > > subscription filters may change over time: they are usually configured
> > > > using Subscription Properties, and those properties are dynamic
> > > > doing computations on the write path (like running filters) kills
> > > > latency and thoughtput
> > > >
> > > > Use a client to clone the subscription and consume data.
> > > > This doesn't work because you have to transfer the data to the client,
> > > > and this is possibly a huge amount of work and a waste of resources.
> > > > ---
> > > >
> > > > What if we don't do it directly on the write path.
> > > > What if the topic owner creates an internal subscription, consumes the
> > > > messages, and updates a count per filter.
> > > > Thus, those computation will have less effect directly on the write
> > path.
> > > >
> > > > I'm trying to compare that cost of compuations, with consuming all the
> > > > messages, again and again, running filter computation for them, every
> > > > interval (say 1min).
> > > > The amount of computation in the latter would be more costly, no?
> > > >
> > > >
> > > > > When you update the filter / add a filter
> > > > > > by adding a new subscription, you can run code that reads from the
> > > > > > beginning of the subscription (first unacked message) to catch up
> > and
> > > > > then
> > > > > > continues. This may be done async, so the metric will take some
> > time to
> > > > > > catch up.
> > > > > > Amortized, it has less cost on the system overall, if compared to
> > > > reading
> > > > > > all the messages multiple times to get a period size of the
> > > > subscription.
> > > > > > Both solutions are expensive as opposed to nothing of course. Both
> > has
> > > > to
> > > > > > be a well documented conscious choice.
> > > > > > WDYT?
> > > > >
> > > > >
> > > > > Enrico
> > > > > >
> > > > > > Asaf
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <
> > eolivelli@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hello,
> > > > > > > this is a PIP to implement a tool to analyse the subscription
> > backlog
> > > > > > >
> > > > > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > > > > >
> > > > > > > Below you can find the proposal (I will amend the GH issue while
> > we
> > > > > > > discuss, as usual)
> > > > > > >
> > > > > > > Enrico
> > > > > > >
> > > > > > > Motivation
> > > > > > >
> > > > > > > Currently there is no way to have a accurate backlog for a
> > > > > subscription:
> > > > > > >
> > > > > > > you have only the number of "entries", not messages
> > > > > > > server side filters (PIP-105) may filter out some messages
> > > > > > >
> > > > > > > Having the number of entries is sometimes not enough because with
> > > > > > > batch messages the amount of work on the Consumers is
> > proportional to
> > > > > > > the number of messages, that may vary from entry to entry.
> > > > > > >
> > > > > > > Goal
> > > > > > >
> > > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription
> > and
> > > > > > > provide detailed information about that is expected to be
> > delivered
> > > > to
> > > > > > > Consumers.
> > > > > > >
> > > > > > > The operation will be quite expensive because we have to load the
> > > > > > > messages from storage and pass them to the filters, but due to
> > the
> > > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> > have
> > > > > > > this value.
> > > > > > >
> > > > > > > One good strategy to do monitoring/alerting is to setup alerts
> > on the
> > > > > > > usual "stats" and use this new API to inspect the subscription
> > > > deeper,
> > > > > > > typically be issuing a manual command.
> > > > > > >
> > > > > > > API Changes
> > > > > > >
> > > > > > > internal ManagedCursor API:
> > > > > > >
> > > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> > long
> > > > > > > maxEntries, long timeOutMs);
> > > > > > >
> > > > > > > This method scans the Cursor from the lastMarkDelete position to
> > the
> > > > > tail.
> > > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > > order to prevent huge (and useless) scans.
> > > > > > > The Predicate can stop the scan, if it doesn't want to continue
> > the
> > > > > > > processing for some reasons.
> > > > > > >
> > > > > > > New REST API:
> > > > > > >
> > > > > > >     @GET
> > > > > > >
> > > > > > >
> > > > >
> > > >
> > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > > > > >     @ApiOperation(value = "Analyse a subscription, by scanning
> > all
> > > > the
> > > > > > > unprocessed messages")
> > > > > > >
> > > > > > >     public void analiseSubscriptionBacklog(
> > > > > > >            @Suspended final AsyncResponse asyncResponse,
> > > > > > >             @ApiParam(value = "Specify the tenant", required =
> > true)
> > > > > > >             @PathParam("tenant") String tenant,
> > > > > > >             @ApiParam(value = "Specify the namespace", required =
> > > > true)
> > > > > > >             @PathParam("namespace") String namespace,
> > > > > > >             @ApiParam(value = "Specify topic name", required =
> > true)
> > > > > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > > > > >             @ApiParam(value = "Subscription", required = true)
> > > > > > >             @PathParam("subName") String encodedSubName,
> > > > > > >             @ApiParam(value = "Is authentication required to
> > perform
> > > > > > > this operation")
> > > > > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > > > > boolean authoritative) {
> > > > > > >
> > > > > > > API response model:
> > > > > > >
> > > > > > > public class AnaliseSubscriptionBacklogResult {
> > > > > > >     private long entries;
> > > > > > >     private long messages;
> > > > > > >
> > > > > > >     private long filterRejectedEntries;
> > > > > > >     private long filterAcceptedEntries;
> > > > > > >     private long filterRescheduledEntries;
> > > > > > >
> > > > > > >     private long filterRejectedMessages;
> > > > > > >     private long filterAcceptedMessages;
> > > > > > >     private long filterRescheduledMessages;
> > > > > > >
> > > > > > >     private boolean aborted;
> > > > > > >
> > > > > > > The response contains "aborted=true" is the request has been
> > aborted
> > > > > > > by some internal limitations, like a timeout or the scan hit the
> > max
> > > > > > > number of entries.
> > > > > > > We are not going to provide more details about the reason of the
> > > > stop.
> > > > > > > It will make the API too detailed and harder to maintain. Also,
> > in
> > > > the
> > > > > > > logs of the broker you will find the details.
> > > > > > >
> > > > > > > New PulsarAdmin API:
> > > > > > >
> > > > > > > /**
> > > > > > >      * Analise subscription backlog.
> > > > > > >      * This is a potentially expensive operation, as it requires
> > > > > > >      * to read the messages from storage.
> > > > > > >      * This function takes into consideration batch messages
> > > > > > >      * and also Subscription filters.
> > > > > > >      * @param topic
> > > > > > >      *            Topic name
> > > > > > >      * @param subscriptionName
> > > > > > >      *            the subscription
> > > > > > >      * @return an accurate analysis of the backlog
> > > > > > >      * @throws PulsarAdminException
> > > > > > >      *            Unexpected error
> > > > > > >      */
> > > > > > >     AnaliseSubscriptionBacklogResult
> > > > analiseSubscriptionBacklog(String
> > > > > > > topic, String subscriptionName)
> > > > > > >             throws PulsarAdminException;
> > > > > > >
> > > > > > >     /**
> > > > > > >      * Analise subscription backlog.
> > > > > > >      * This is a potentially expensive operation, as it requires
> > > > > > >      * to read the messages from storage.
> > > > > > >      * This function takes into consideration batch messages
> > > > > > >      * and also Subscription filters.
> > > > > > >      * @param topic
> > > > > > >      *            Topic name
> > > > > > >      * @param subscriptionName
> > > > > > >      *            the subscription
> > > > > > >      * @return an accurate analysis of the backlog
> > > > > > >      * @throws PulsarAdminException
> > > > > > >      *            Unexpected error
> > > > > > >      */
> > > > > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > > >
> > > > > > >                  String subscriptionName);
> > > > > > >
> > > > > > > A pulsar-admin command will be added as well as usual.
> > > > > > >
> > > > > > > New configuration entries in broker.conf:
> > > > > > >
> > > > > > > @FieldContext(
> > > > > > >          category = CATEGORY_POLICIES,
> > > > > > >          doc = "Maximum time to spend while scanning a
> > subscription
> > > > to
> > > > > > > calculate the accurate backlog"
> > > > > > >  )
> > > > > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > > >  @FieldContext(
> > > > > > >          category = CATEGORY_POLICIES,
> > > > > > >          doc = "Maximum number of entries to process while
> > scanning a
> > > > > > > subscription to calculate the accurate backlog"
> > > > > > >  )
> > > > > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > > >
> > > > > > > Implementation
> > > > > > >
> > > > > > > The implementation is pretty straightforward:
> > > > > > >
> > > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > > add the REST API
> > > > > > > implement in PersistentSubscription a analiseBacklog method that
> > does
> > > > > the
> > > > > > > scan
> > > > > > >
> > > > > > > The the PersistentSubscription runs the scan:
> > > > > > >
> > > > > > > it applies the filters if they are present
> > > > > > > it considers individuallyDeletedMessages
> > > > > > >
> > > > > > > Non trivial problem regarding the Dispatcher:
> > > > > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > > > > PersistentSubscription starts a Dispatcher only when the first
> > > > > > > consumer is connecter.
> > > > > > > This happens because the Subscription itself doesn't have a type
> > > > > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this
> > stuff
> > > > > > > is decided by the first consumer coming (after the load of the
> > topic,
> > > > > > > so the subscription type may change after a topic unload).
> > > > > > > This PIP won't fix this "problem", and so in case of missing
> > > > > > > Dispatcher we are going to use a ephemeral Dispatcher without
> > Type.
> > > > > > > Maybe in the future it will be better to persist the Subscription
> > > > Type
> > > > > > > and other metadata, this way we can create the Dispatcher while
> > > > > > > instantiating the Subscription.
> > > > > > >
> > > > > > > Reject Alternatives
> > > > > > >
> > > > > > > We could store somehow some counter about the number of logical
> > > > > > > messages during writes. But that does not work for a few reasons:
> > > > > > >
> > > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > > subscription can be created from the past (Earliest)
> > > > > > > subscription filters may change over time: they are usually
> > > > configured
> > > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > > doing computations on the write path (like running filters) kills
> > > > > > > latency and thoughtput
> > > > > > >
> > > > > > > Use a client to clone the subscription and consume data.
> > > > > > > This doesn't work because you have to transfer the data to the
> > > > client,
> > > > > > > and this is possibly a huge amount of work and a waste of
> > resources.
> > > > > > >
> > > > >
> > > >
> >

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by PengHui Li <pe...@apache.org>.
> As you can't know when you will need this value, you are going to
waste resources in order to calculate
something that you won't use.

> This API to analyze a subscription is not meant to be used often, but
only to debug (manually) problems.

Oh, I got your point.

One concern is if we are using REST API to get the accurate value,
not sure if all the messages are filtered out within the request timeout
or the max scan entries.

I'm thinking can we add a start message ID for the request? It looks like we
can continue to get the backlogs based on the last scanned message ID.

For example:

 - get backlog from 1:1
 - received 1 backlog and the last scanned message ID is 1:50000, which
does not reach the end of topic
 - get the backlog from 1:50001
 - received 2 backlogs and the last scanned message ID 3:40000, which does
not reach the end of topic
 - get the backlog from 3:40000
 - received 5 backlogs and reached the end of topic

The accurate backlog for this subscription is 8.

Thanks,
Penghui
On Thu, Jul 21, 2022 at 3:36 PM Enrico Olivelli <eo...@gmail.com> wrote:

> Il giorno gio 21 lug 2022 alle ore 06:25 PengHui Li
> <pe...@apache.org> ha scritto:
> >
> > > What if the topic owner creates an internal subscription, consumes the
> > messages, and updates a count per filter.
> >
> > I agree with this approach. If we need to scan all the backlogs to
> > calculate the
> > accurate backlogs for each operation, it's so expensive and difficult to
> > apply to
> > the production environment. With the counter for each
> filter(subscription)
> > and only
> > re-scan the data after the filter changes will reduce a lot of overhead.
>
> This approach is very expensive when you don't need this feature.
> Because in order to have this value you have to read everything (once
> per subscription),
> especially when you have a subscription without consumers and the
> topic is being written.
>
> As you can't know when you will need this value, you are going to
> waste resources in order to calculate
> something that you won't use.
>
>
> This API to analyze a subscription is not meant to be used often, but
> only to debug (manually) problems.
>
> Also, the filter may be dependent on some other environment variables,
> like the wall clock time,
> if you have a filter that depends on time and you pre-calculate the
> backlog your counter won't be correct.
>
> >
> > If we want to expose the accurate backlogs in the Prometheus endpoint,
> > it's almost impossible.
>
> I don't think this is actually possible if you want to take into
> consideration the filters.
> We are in the case of general purpose filtering (actually we allow
> Java code to be plugged into the browser),
> so pre-calculating the counters won't work well.
>
>
> Enrico
>
> >
> > Thanks,
> > Penghui
> >
> > On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika <as...@gmail.com>
> wrote:
> >
> > > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli <eo...@gmail.com>
> > > wrote:
> > >
> > > > Asaf,
> > > >
> > > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> > > > <as...@gmail.com> ha scritto:
> > > > >
> > > > > I'm not sure I understand the context exactly:
> > > > >
> > > > > You say today we can only know the number of entries, hence we'll
> have
> > > a
> > > > > wrong number of backlog for subscription since:
> > > > > 1. One entry contains multiple messages (batch message)
> > > > > 2. Subscription may contain a filter, which requires you to read
> the
> > > > entire
> > > > > backlog to know it
> > > >
> > > > correct
> > > >
> > > > >
> > > > > There are two things I don't understand:
> > > > >
> > > > > 1. We're adding an observability API, which you need to pay all the
> > > read
> > > > > cost just to know the count. I presume people would want to run
> this
> > > more
> > > > > than once. So they will read same data multiple times - why would a
> > > user
> > > > be
> > > > > willing to pay such a hefty price?
> > > >
> > > > sometimes it is the case, because processing a message may have a
> high
> > > > cost.
> > > > So having 10 entries of 100 messages is not correctly representing
> the
> > > > amount of work that must be done by the consumers
> > > > and so the user may wish to have an exact count.
> > > >
> > > > Having the filters adds more complexity because you cannot predict
> how
> > > > many entries will be filtered out
> > > >
> > > >
> > > > So it's mainly serving that specific use case of reading the entire
> > > messages over and over (every interval) is an order of magnitude less
> > > expensive than the processing it self.
> > >
> > >
> > > > > 2. If the user needs to know an accurate backlog, can't they use
> the
> > > > > ability to create a very large number of topics, thus they will
> know an
> > > > > accurate backlog without the huge cost?
> > > >
> > > > I can't understand why creating many topics will help.
> > > > instead with filters it is very likely that you have only fewer
> topics
> > > > with many subscriptions with different filters
> > > >
> > > > as you don't know the filters while writing you cannot route the
> > > > messages to some topic
> > > > also you would need to write the message to potentially multiple
> > > > topics, and that would be a huge write amplification
> > > > (think about a topic with 100 subscriptions)
> > > >
> > > > Yes, I haven't thought about that.
> > > What I was thinking is that those filters are mutually exclusive
> therefor
> > > topics, but in your case, if you have 100 different filters, and they
> > > overlap, yes it would be way more expensive to write them 100 times.
> > >
> > > >
> > > > > I have an idea, if that's ok:
> > > > >
> > > > > What if you can keep, as you said in your document, a metric
> counting
> > > > > messages per filter upon write.
> > > > This is not possible as described above
> > > >
> > >
> > > You wrote above that:
> > >
> > > ---
> > > you cannot know which subscriptions will be created in a topic
> > > subscription can be created from the past (Earliest)
> > > subscription filters may change over time: they are usually configured
> > > using Subscription Properties, and those properties are dynamic
> > > doing computations on the write path (like running filters) kills
> > > latency and thoughtput
> > >
> > > Use a client to clone the subscription and consume data.
> > > This doesn't work because you have to transfer the data to the client,
> > > and this is possibly a huge amount of work and a waste of resources.
> > > ---
> > >
> > > What if we don't do it directly on the write path.
> > > What if the topic owner creates an internal subscription, consumes the
> > > messages, and updates a count per filter.
> > > Thus, those computation will have less effect directly on the write
> path.
> > >
> > > I'm trying to compare that cost of compuations, with consuming all the
> > > messages, again and again, running filter computation for them, every
> > > interval (say 1min).
> > > The amount of computation in the latter would be more costly, no?
> > >
> > >
> > > > When you update the filter / add a filter
> > > > > by adding a new subscription, you can run code that reads from the
> > > > > beginning of the subscription (first unacked message) to catch up
> and
> > > > then
> > > > > continues. This may be done async, so the metric will take some
> time to
> > > > > catch up.
> > > > > Amortized, it has less cost on the system overall, if compared to
> > > reading
> > > > > all the messages multiple times to get a period size of the
> > > subscription.
> > > > > Both solutions are expensive as opposed to nothing of course. Both
> has
> > > to
> > > > > be a well documented conscious choice.
> > > > > WDYT?
> > > >
> > > >
> > > > Enrico
> > > > >
> > > > > Asaf
> > > > >
> > > > >
> > > > > On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <
> eolivelli@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > > this is a PIP to implement a tool to analyse the subscription
> backlog
> > > > > >
> > > > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > > > >
> > > > > > Below you can find the proposal (I will amend the GH issue while
> we
> > > > > > discuss, as usual)
> > > > > >
> > > > > > Enrico
> > > > > >
> > > > > > Motivation
> > > > > >
> > > > > > Currently there is no way to have a accurate backlog for a
> > > > subscription:
> > > > > >
> > > > > > you have only the number of "entries", not messages
> > > > > > server side filters (PIP-105) may filter out some messages
> > > > > >
> > > > > > Having the number of entries is sometimes not enough because with
> > > > > > batch messages the amount of work on the Consumers is
> proportional to
> > > > > > the number of messages, that may vary from entry to entry.
> > > > > >
> > > > > > Goal
> > > > > >
> > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription
> and
> > > > > > provide detailed information about that is expected to be
> delivered
> > > to
> > > > > > Consumers.
> > > > > >
> > > > > > The operation will be quite expensive because we have to load the
> > > > > > messages from storage and pass them to the filters, but due to
> the
> > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> have
> > > > > > this value.
> > > > > >
> > > > > > One good strategy to do monitoring/alerting is to setup alerts
> on the
> > > > > > usual "stats" and use this new API to inspect the subscription
> > > deeper,
> > > > > > typically be issuing a manual command.
> > > > > >
> > > > > > API Changes
> > > > > >
> > > > > > internal ManagedCursor API:
> > > > > >
> > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> long
> > > > > > maxEntries, long timeOutMs);
> > > > > >
> > > > > > This method scans the Cursor from the lastMarkDelete position to
> the
> > > > tail.
> > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > order to prevent huge (and useless) scans.
> > > > > > The Predicate can stop the scan, if it doesn't want to continue
> the
> > > > > > processing for some reasons.
> > > > > >
> > > > > > New REST API:
> > > > > >
> > > > > >     @GET
> > > > > >
> > > > > >
> > > >
> > >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > > > >     @ApiOperation(value = "Analyse a subscription, by scanning
> all
> > > the
> > > > > > unprocessed messages")
> > > > > >
> > > > > >     public void analiseSubscriptionBacklog(
> > > > > >            @Suspended final AsyncResponse asyncResponse,
> > > > > >             @ApiParam(value = "Specify the tenant", required =
> true)
> > > > > >             @PathParam("tenant") String tenant,
> > > > > >             @ApiParam(value = "Specify the namespace", required =
> > > true)
> > > > > >             @PathParam("namespace") String namespace,
> > > > > >             @ApiParam(value = "Specify topic name", required =
> true)
> > > > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > > > >             @ApiParam(value = "Subscription", required = true)
> > > > > >             @PathParam("subName") String encodedSubName,
> > > > > >             @ApiParam(value = "Is authentication required to
> perform
> > > > > > this operation")
> > > > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > > > boolean authoritative) {
> > > > > >
> > > > > > API response model:
> > > > > >
> > > > > > public class AnaliseSubscriptionBacklogResult {
> > > > > >     private long entries;
> > > > > >     private long messages;
> > > > > >
> > > > > >     private long filterRejectedEntries;
> > > > > >     private long filterAcceptedEntries;
> > > > > >     private long filterRescheduledEntries;
> > > > > >
> > > > > >     private long filterRejectedMessages;
> > > > > >     private long filterAcceptedMessages;
> > > > > >     private long filterRescheduledMessages;
> > > > > >
> > > > > >     private boolean aborted;
> > > > > >
> > > > > > The response contains "aborted=true" is the request has been
> aborted
> > > > > > by some internal limitations, like a timeout or the scan hit the
> max
> > > > > > number of entries.
> > > > > > We are not going to provide more details about the reason of the
> > > stop.
> > > > > > It will make the API too detailed and harder to maintain. Also,
> in
> > > the
> > > > > > logs of the broker you will find the details.
> > > > > >
> > > > > > New PulsarAdmin API:
> > > > > >
> > > > > > /**
> > > > > >      * Analise subscription backlog.
> > > > > >      * This is a potentially expensive operation, as it requires
> > > > > >      * to read the messages from storage.
> > > > > >      * This function takes into consideration batch messages
> > > > > >      * and also Subscription filters.
> > > > > >      * @param topic
> > > > > >      *            Topic name
> > > > > >      * @param subscriptionName
> > > > > >      *            the subscription
> > > > > >      * @return an accurate analysis of the backlog
> > > > > >      * @throws PulsarAdminException
> > > > > >      *            Unexpected error
> > > > > >      */
> > > > > >     AnaliseSubscriptionBacklogResult
> > > analiseSubscriptionBacklog(String
> > > > > > topic, String subscriptionName)
> > > > > >             throws PulsarAdminException;
> > > > > >
> > > > > >     /**
> > > > > >      * Analise subscription backlog.
> > > > > >      * This is a potentially expensive operation, as it requires
> > > > > >      * to read the messages from storage.
> > > > > >      * This function takes into consideration batch messages
> > > > > >      * and also Subscription filters.
> > > > > >      * @param topic
> > > > > >      *            Topic name
> > > > > >      * @param subscriptionName
> > > > > >      *            the subscription
> > > > > >      * @return an accurate analysis of the backlog
> > > > > >      * @throws PulsarAdminException
> > > > > >      *            Unexpected error
> > > > > >      */
> > > > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > >
> > > > > >                  String subscriptionName);
> > > > > >
> > > > > > A pulsar-admin command will be added as well as usual.
> > > > > >
> > > > > > New configuration entries in broker.conf:
> > > > > >
> > > > > > @FieldContext(
> > > > > >          category = CATEGORY_POLICIES,
> > > > > >          doc = "Maximum time to spend while scanning a
> subscription
> > > to
> > > > > > calculate the accurate backlog"
> > > > > >  )
> > > > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > >  @FieldContext(
> > > > > >          category = CATEGORY_POLICIES,
> > > > > >          doc = "Maximum number of entries to process while
> scanning a
> > > > > > subscription to calculate the accurate backlog"
> > > > > >  )
> > > > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > >
> > > > > > Implementation
> > > > > >
> > > > > > The implementation is pretty straightforward:
> > > > > >
> > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > add the REST API
> > > > > > implement in PersistentSubscription a analiseBacklog method that
> does
> > > > the
> > > > > > scan
> > > > > >
> > > > > > The the PersistentSubscription runs the scan:
> > > > > >
> > > > > > it applies the filters if they are present
> > > > > > it considers individuallyDeletedMessages
> > > > > >
> > > > > > Non trivial problem regarding the Dispatcher:
> > > > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > > > PersistentSubscription starts a Dispatcher only when the first
> > > > > > consumer is connecter.
> > > > > > This happens because the Subscription itself doesn't have a type
> > > > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this
> stuff
> > > > > > is decided by the first consumer coming (after the load of the
> topic,
> > > > > > so the subscription type may change after a topic unload).
> > > > > > This PIP won't fix this "problem", and so in case of missing
> > > > > > Dispatcher we are going to use a ephemeral Dispatcher without
> Type.
> > > > > > Maybe in the future it will be better to persist the Subscription
> > > Type
> > > > > > and other metadata, this way we can create the Dispatcher while
> > > > > > instantiating the Subscription.
> > > > > >
> > > > > > Reject Alternatives
> > > > > >
> > > > > > We could store somehow some counter about the number of logical
> > > > > > messages during writes. But that does not work for a few reasons:
> > > > > >
> > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > subscription can be created from the past (Earliest)
> > > > > > subscription filters may change over time: they are usually
> > > configured
> > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > doing computations on the write path (like running filters) kills
> > > > > > latency and thoughtput
> > > > > >
> > > > > > Use a client to clone the subscription and consume data.
> > > > > > This doesn't work because you have to transfer the data to the
> > > client,
> > > > > > and this is possibly a huge amount of work and a waste of
> resources.
> > > > > >
> > > >
> > >
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Enrico Olivelli <eo...@gmail.com>.
Il giorno gio 21 lug 2022 alle ore 06:25 PengHui Li
<pe...@apache.org> ha scritto:
>
> > What if the topic owner creates an internal subscription, consumes the
> messages, and updates a count per filter.
>
> I agree with this approach. If we need to scan all the backlogs to
> calculate the
> accurate backlogs for each operation, it's so expensive and difficult to
> apply to
> the production environment. With the counter for each filter(subscription)
> and only
> re-scan the data after the filter changes will reduce a lot of overhead.

This approach is very expensive when you don't need this feature.
Because in order to have this value you have to read everything (once
per subscription),
especially when you have a subscription without consumers and the
topic is being written.

As you can't know when you will need this value, you are going to
waste resources in order to calculate
something that you won't use.


This API to analyze a subscription is not meant to be used often, but
only to debug (manually) problems.

Also, the filter may be dependent on some other environment variables,
like the wall clock time,
if you have a filter that depends on time and you pre-calculate the
backlog your counter won't be correct.

>
> If we want to expose the accurate backlogs in the Prometheus endpoint,
> it's almost impossible.

I don't think this is actually possible if you want to take into
consideration the filters.
We are in the case of general purpose filtering (actually we allow
Java code to be plugged into the browser),
so pre-calculating the counters won't work well.


Enrico

>
> Thanks,
> Penghui
>
> On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika <as...@gmail.com> wrote:
>
> > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli <eo...@gmail.com>
> > wrote:
> >
> > > Asaf,
> > >
> > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> > > <as...@gmail.com> ha scritto:
> > > >
> > > > I'm not sure I understand the context exactly:
> > > >
> > > > You say today we can only know the number of entries, hence we'll have
> > a
> > > > wrong number of backlog for subscription since:
> > > > 1. One entry contains multiple messages (batch message)
> > > > 2. Subscription may contain a filter, which requires you to read the
> > > entire
> > > > backlog to know it
> > >
> > > correct
> > >
> > > >
> > > > There are two things I don't understand:
> > > >
> > > > 1. We're adding an observability API, which you need to pay all the
> > read
> > > > cost just to know the count. I presume people would want to run this
> > more
> > > > than once. So they will read same data multiple times - why would a
> > user
> > > be
> > > > willing to pay such a hefty price?
> > >
> > > sometimes it is the case, because processing a message may have a high
> > > cost.
> > > So having 10 entries of 100 messages is not correctly representing the
> > > amount of work that must be done by the consumers
> > > and so the user may wish to have an exact count.
> > >
> > > Having the filters adds more complexity because you cannot predict how
> > > many entries will be filtered out
> > >
> > >
> > > So it's mainly serving that specific use case of reading the entire
> > messages over and over (every interval) is an order of magnitude less
> > expensive than the processing it self.
> >
> >
> > > > 2. If the user needs to know an accurate backlog, can't they use the
> > > > ability to create a very large number of topics, thus they will know an
> > > > accurate backlog without the huge cost?
> > >
> > > I can't understand why creating many topics will help.
> > > instead with filters it is very likely that you have only fewer topics
> > > with many subscriptions with different filters
> > >
> > > as you don't know the filters while writing you cannot route the
> > > messages to some topic
> > > also you would need to write the message to potentially multiple
> > > topics, and that would be a huge write amplification
> > > (think about a topic with 100 subscriptions)
> > >
> > > Yes, I haven't thought about that.
> > What I was thinking is that those filters are mutually exclusive therefor
> > topics, but in your case, if you have 100 different filters, and they
> > overlap, yes it would be way more expensive to write them 100 times.
> >
> > >
> > > > I have an idea, if that's ok:
> > > >
> > > > What if you can keep, as you said in your document, a metric counting
> > > > messages per filter upon write.
> > > This is not possible as described above
> > >
> >
> > You wrote above that:
> >
> > ---
> > you cannot know which subscriptions will be created in a topic
> > subscription can be created from the past (Earliest)
> > subscription filters may change over time: they are usually configured
> > using Subscription Properties, and those properties are dynamic
> > doing computations on the write path (like running filters) kills
> > latency and thoughtput
> >
> > Use a client to clone the subscription and consume data.
> > This doesn't work because you have to transfer the data to the client,
> > and this is possibly a huge amount of work and a waste of resources.
> > ---
> >
> > What if we don't do it directly on the write path.
> > What if the topic owner creates an internal subscription, consumes the
> > messages, and updates a count per filter.
> > Thus, those computation will have less effect directly on the write path.
> >
> > I'm trying to compare that cost of compuations, with consuming all the
> > messages, again and again, running filter computation for them, every
> > interval (say 1min).
> > The amount of computation in the latter would be more costly, no?
> >
> >
> > > When you update the filter / add a filter
> > > > by adding a new subscription, you can run code that reads from the
> > > > beginning of the subscription (first unacked message) to catch up and
> > > then
> > > > continues. This may be done async, so the metric will take some time to
> > > > catch up.
> > > > Amortized, it has less cost on the system overall, if compared to
> > reading
> > > > all the messages multiple times to get a period size of the
> > subscription.
> > > > Both solutions are expensive as opposed to nothing of course. Both has
> > to
> > > > be a well documented conscious choice.
> > > > WDYT?
> > >
> > >
> > > Enrico
> > > >
> > > > Asaf
> > > >
> > > >
> > > > On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <eo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello,
> > > > > this is a PIP to implement a tool to analyse the subscription backlog
> > > > >
> > > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > > >
> > > > > Below you can find the proposal (I will amend the GH issue while we
> > > > > discuss, as usual)
> > > > >
> > > > > Enrico
> > > > >
> > > > > Motivation
> > > > >
> > > > > Currently there is no way to have a accurate backlog for a
> > > subscription:
> > > > >
> > > > > you have only the number of "entries", not messages
> > > > > server side filters (PIP-105) may filter out some messages
> > > > >
> > > > > Having the number of entries is sometimes not enough because with
> > > > > batch messages the amount of work on the Consumers is proportional to
> > > > > the number of messages, that may vary from entry to entry.
> > > > >
> > > > > Goal
> > > > >
> > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > > > > provide detailed information about that is expected to be delivered
> > to
> > > > > Consumers.
> > > > >
> > > > > The operation will be quite expensive because we have to load the
> > > > > messages from storage and pass them to the filters, but due to the
> > > > > dynamic nature of Pulsar subscriptions there is no other way to have
> > > > > this value.
> > > > >
> > > > > One good strategy to do monitoring/alerting is to setup alerts on the
> > > > > usual "stats" and use this new API to inspect the subscription
> > deeper,
> > > > > typically be issuing a manual command.
> > > > >
> > > > > API Changes
> > > > >
> > > > > internal ManagedCursor API:
> > > > >
> > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > > > > maxEntries, long timeOutMs);
> > > > >
> > > > > This method scans the Cursor from the lastMarkDelete position to the
> > > tail.
> > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > order to prevent huge (and useless) scans.
> > > > > The Predicate can stop the scan, if it doesn't want to continue the
> > > > > processing for some reasons.
> > > > >
> > > > > New REST API:
> > > > >
> > > > >     @GET
> > > > >
> > > > >
> > >
> > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > > >     @ApiOperation(value = "Analyse a subscription, by scanning all
> > the
> > > > > unprocessed messages")
> > > > >
> > > > >     public void analiseSubscriptionBacklog(
> > > > >            @Suspended final AsyncResponse asyncResponse,
> > > > >             @ApiParam(value = "Specify the tenant", required = true)
> > > > >             @PathParam("tenant") String tenant,
> > > > >             @ApiParam(value = "Specify the namespace", required =
> > true)
> > > > >             @PathParam("namespace") String namespace,
> > > > >             @ApiParam(value = "Specify topic name", required = true)
> > > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > > >             @ApiParam(value = "Subscription", required = true)
> > > > >             @PathParam("subName") String encodedSubName,
> > > > >             @ApiParam(value = "Is authentication required to perform
> > > > > this operation")
> > > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > > boolean authoritative) {
> > > > >
> > > > > API response model:
> > > > >
> > > > > public class AnaliseSubscriptionBacklogResult {
> > > > >     private long entries;
> > > > >     private long messages;
> > > > >
> > > > >     private long filterRejectedEntries;
> > > > >     private long filterAcceptedEntries;
> > > > >     private long filterRescheduledEntries;
> > > > >
> > > > >     private long filterRejectedMessages;
> > > > >     private long filterAcceptedMessages;
> > > > >     private long filterRescheduledMessages;
> > > > >
> > > > >     private boolean aborted;
> > > > >
> > > > > The response contains "aborted=true" is the request has been aborted
> > > > > by some internal limitations, like a timeout or the scan hit the max
> > > > > number of entries.
> > > > > We are not going to provide more details about the reason of the
> > stop.
> > > > > It will make the API too detailed and harder to maintain. Also, in
> > the
> > > > > logs of the broker you will find the details.
> > > > >
> > > > > New PulsarAdmin API:
> > > > >
> > > > > /**
> > > > >      * Analise subscription backlog.
> > > > >      * This is a potentially expensive operation, as it requires
> > > > >      * to read the messages from storage.
> > > > >      * This function takes into consideration batch messages
> > > > >      * and also Subscription filters.
> > > > >      * @param topic
> > > > >      *            Topic name
> > > > >      * @param subscriptionName
> > > > >      *            the subscription
> > > > >      * @return an accurate analysis of the backlog
> > > > >      * @throws PulsarAdminException
> > > > >      *            Unexpected error
> > > > >      */
> > > > >     AnaliseSubscriptionBacklogResult
> > analiseSubscriptionBacklog(String
> > > > > topic, String subscriptionName)
> > > > >             throws PulsarAdminException;
> > > > >
> > > > >     /**
> > > > >      * Analise subscription backlog.
> > > > >      * This is a potentially expensive operation, as it requires
> > > > >      * to read the messages from storage.
> > > > >      * This function takes into consideration batch messages
> > > > >      * and also Subscription filters.
> > > > >      * @param topic
> > > > >      *            Topic name
> > > > >      * @param subscriptionName
> > > > >      *            the subscription
> > > > >      * @return an accurate analysis of the backlog
> > > > >      * @throws PulsarAdminException
> > > > >      *            Unexpected error
> > > > >      */
> > > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > >
> > > > >                  String subscriptionName);
> > > > >
> > > > > A pulsar-admin command will be added as well as usual.
> > > > >
> > > > > New configuration entries in broker.conf:
> > > > >
> > > > > @FieldContext(
> > > > >          category = CATEGORY_POLICIES,
> > > > >          doc = "Maximum time to spend while scanning a subscription
> > to
> > > > > calculate the accurate backlog"
> > > > >  )
> > > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > >  @FieldContext(
> > > > >          category = CATEGORY_POLICIES,
> > > > >          doc = "Maximum number of entries to process while scanning a
> > > > > subscription to calculate the accurate backlog"
> > > > >  )
> > > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > >
> > > > > Implementation
> > > > >
> > > > > The implementation is pretty straightforward:
> > > > >
> > > > > add a new API in ManagedCursor to do the Scan
> > > > > add the REST API
> > > > > implement in PersistentSubscription a analiseBacklog method that does
> > > the
> > > > > scan
> > > > >
> > > > > The the PersistentSubscription runs the scan:
> > > > >
> > > > > it applies the filters if they are present
> > > > > it considers individuallyDeletedMessages
> > > > >
> > > > > Non trivial problem regarding the Dispatcher:
> > > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > > PersistentSubscription starts a Dispatcher only when the first
> > > > > consumer is connecter.
> > > > > This happens because the Subscription itself doesn't have a type
> > > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> > > > > is decided by the first consumer coming (after the load of the topic,
> > > > > so the subscription type may change after a topic unload).
> > > > > This PIP won't fix this "problem", and so in case of missing
> > > > > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > > > > Maybe in the future it will be better to persist the Subscription
> > Type
> > > > > and other metadata, this way we can create the Dispatcher while
> > > > > instantiating the Subscription.
> > > > >
> > > > > Reject Alternatives
> > > > >
> > > > > We could store somehow some counter about the number of logical
> > > > > messages during writes. But that does not work for a few reasons:
> > > > >
> > > > > you cannot know which subscriptions will be created in a topic
> > > > > subscription can be created from the past (Earliest)
> > > > > subscription filters may change over time: they are usually
> > configured
> > > > > using Subscription Properties, and those properties are dynamic
> > > > > doing computations on the write path (like running filters) kills
> > > > > latency and thoughtput
> > > > >
> > > > > Use a client to clone the subscription and consume data.
> > > > > This doesn't work because you have to transfer the data to the
> > client,
> > > > > and this is possibly a huge amount of work and a waste of resources.
> > > > >
> > >
> >

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by PengHui Li <pe...@apache.org>.
> What if the topic owner creates an internal subscription, consumes the
messages, and updates a count per filter.

I agree with this approach. If we need to scan all the backlogs to
calculate the
accurate backlogs for each operation, it's so expensive and difficult to
apply to
the production environment. With the counter for each filter(subscription)
and only
re-scan the data after the filter changes will reduce a lot of overhead.

If we want to expose the accurate backlogs in the Prometheus endpoint,
it's almost impossible.

Thanks,
Penghui

On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika <as...@gmail.com> wrote:

> On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > Asaf,
> >
> > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> > <as...@gmail.com> ha scritto:
> > >
> > > I'm not sure I understand the context exactly:
> > >
> > > You say today we can only know the number of entries, hence we'll have
> a
> > > wrong number of backlog for subscription since:
> > > 1. One entry contains multiple messages (batch message)
> > > 2. Subscription may contain a filter, which requires you to read the
> > entire
> > > backlog to know it
> >
> > correct
> >
> > >
> > > There are two things I don't understand:
> > >
> > > 1. We're adding an observability API, which you need to pay all the
> read
> > > cost just to know the count. I presume people would want to run this
> more
> > > than once. So they will read same data multiple times - why would a
> user
> > be
> > > willing to pay such a hefty price?
> >
> > sometimes it is the case, because processing a message may have a high
> > cost.
> > So having 10 entries of 100 messages is not correctly representing the
> > amount of work that must be done by the consumers
> > and so the user may wish to have an exact count.
> >
> > Having the filters adds more complexity because you cannot predict how
> > many entries will be filtered out
> >
> >
> > So it's mainly serving that specific use case of reading the entire
> messages over and over (every interval) is an order of magnitude less
> expensive than the processing it self.
>
>
> > > 2. If the user needs to know an accurate backlog, can't they use the
> > > ability to create a very large number of topics, thus they will know an
> > > accurate backlog without the huge cost?
> >
> > I can't understand why creating many topics will help.
> > instead with filters it is very likely that you have only fewer topics
> > with many subscriptions with different filters
> >
> > as you don't know the filters while writing you cannot route the
> > messages to some topic
> > also you would need to write the message to potentially multiple
> > topics, and that would be a huge write amplification
> > (think about a topic with 100 subscriptions)
> >
> > Yes, I haven't thought about that.
> What I was thinking is that those filters are mutually exclusive therefor
> topics, but in your case, if you have 100 different filters, and they
> overlap, yes it would be way more expensive to write them 100 times.
>
> >
> > > I have an idea, if that's ok:
> > >
> > > What if you can keep, as you said in your document, a metric counting
> > > messages per filter upon write.
> > This is not possible as described above
> >
>
> You wrote above that:
>
> ---
> you cannot know which subscriptions will be created in a topic
> subscription can be created from the past (Earliest)
> subscription filters may change over time: they are usually configured
> using Subscription Properties, and those properties are dynamic
> doing computations on the write path (like running filters) kills
> latency and thoughtput
>
> Use a client to clone the subscription and consume data.
> This doesn't work because you have to transfer the data to the client,
> and this is possibly a huge amount of work and a waste of resources.
> ---
>
> What if we don't do it directly on the write path.
> What if the topic owner creates an internal subscription, consumes the
> messages, and updates a count per filter.
> Thus, those computation will have less effect directly on the write path.
>
> I'm trying to compare that cost of compuations, with consuming all the
> messages, again and again, running filter computation for them, every
> interval (say 1min).
> The amount of computation in the latter would be more costly, no?
>
>
> > When you update the filter / add a filter
> > > by adding a new subscription, you can run code that reads from the
> > > beginning of the subscription (first unacked message) to catch up and
> > then
> > > continues. This may be done async, so the metric will take some time to
> > > catch up.
> > > Amortized, it has less cost on the system overall, if compared to
> reading
> > > all the messages multiple times to get a period size of the
> subscription.
> > > Both solutions are expensive as opposed to nothing of course. Both has
> to
> > > be a well documented conscious choice.
> > > WDYT?
> >
> >
> > Enrico
> > >
> > > Asaf
> > >
> > >
> > > On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <eo...@gmail.com>
> > > wrote:
> > >
> > > > Hello,
> > > > this is a PIP to implement a tool to analyse the subscription backlog
> > > >
> > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > >
> > > > Below you can find the proposal (I will amend the GH issue while we
> > > > discuss, as usual)
> > > >
> > > > Enrico
> > > >
> > > > Motivation
> > > >
> > > > Currently there is no way to have a accurate backlog for a
> > subscription:
> > > >
> > > > you have only the number of "entries", not messages
> > > > server side filters (PIP-105) may filter out some messages
> > > >
> > > > Having the number of entries is sometimes not enough because with
> > > > batch messages the amount of work on the Consumers is proportional to
> > > > the number of messages, that may vary from entry to entry.
> > > >
> > > > Goal
> > > >
> > > > The idea of this patch is to provide a dedicate API (REST,
> > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > > > provide detailed information about that is expected to be delivered
> to
> > > > Consumers.
> > > >
> > > > The operation will be quite expensive because we have to load the
> > > > messages from storage and pass them to the filters, but due to the
> > > > dynamic nature of Pulsar subscriptions there is no other way to have
> > > > this value.
> > > >
> > > > One good strategy to do monitoring/alerting is to setup alerts on the
> > > > usual "stats" and use this new API to inspect the subscription
> deeper,
> > > > typically be issuing a manual command.
> > > >
> > > > API Changes
> > > >
> > > > internal ManagedCursor API:
> > > >
> > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > > > maxEntries, long timeOutMs);
> > > >
> > > > This method scans the Cursor from the lastMarkDelete position to the
> > tail.
> > > > There is a time limit and a maxEntries limit, these are needed in
> > > > order to prevent huge (and useless) scans.
> > > > The Predicate can stop the scan, if it doesn't want to continue the
> > > > processing for some reasons.
> > > >
> > > > New REST API:
> > > >
> > > >     @GET
> > > >
> > > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > >     @ApiOperation(value = "Analyse a subscription, by scanning all
> the
> > > > unprocessed messages")
> > > >
> > > >     public void analiseSubscriptionBacklog(
> > > >            @Suspended final AsyncResponse asyncResponse,
> > > >             @ApiParam(value = "Specify the tenant", required = true)
> > > >             @PathParam("tenant") String tenant,
> > > >             @ApiParam(value = "Specify the namespace", required =
> true)
> > > >             @PathParam("namespace") String namespace,
> > > >             @ApiParam(value = "Specify topic name", required = true)
> > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > >             @ApiParam(value = "Subscription", required = true)
> > > >             @PathParam("subName") String encodedSubName,
> > > >             @ApiParam(value = "Is authentication required to perform
> > > > this operation")
> > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > boolean authoritative) {
> > > >
> > > > API response model:
> > > >
> > > > public class AnaliseSubscriptionBacklogResult {
> > > >     private long entries;
> > > >     private long messages;
> > > >
> > > >     private long filterRejectedEntries;
> > > >     private long filterAcceptedEntries;
> > > >     private long filterRescheduledEntries;
> > > >
> > > >     private long filterRejectedMessages;
> > > >     private long filterAcceptedMessages;
> > > >     private long filterRescheduledMessages;
> > > >
> > > >     private boolean aborted;
> > > >
> > > > The response contains "aborted=true" is the request has been aborted
> > > > by some internal limitations, like a timeout or the scan hit the max
> > > > number of entries.
> > > > We are not going to provide more details about the reason of the
> stop.
> > > > It will make the API too detailed and harder to maintain. Also, in
> the
> > > > logs of the broker you will find the details.
> > > >
> > > > New PulsarAdmin API:
> > > >
> > > > /**
> > > >      * Analise subscription backlog.
> > > >      * This is a potentially expensive operation, as it requires
> > > >      * to read the messages from storage.
> > > >      * This function takes into consideration batch messages
> > > >      * and also Subscription filters.
> > > >      * @param topic
> > > >      *            Topic name
> > > >      * @param subscriptionName
> > > >      *            the subscription
> > > >      * @return an accurate analysis of the backlog
> > > >      * @throws PulsarAdminException
> > > >      *            Unexpected error
> > > >      */
> > > >     AnaliseSubscriptionBacklogResult
> analiseSubscriptionBacklog(String
> > > > topic, String subscriptionName)
> > > >             throws PulsarAdminException;
> > > >
> > > >     /**
> > > >      * Analise subscription backlog.
> > > >      * This is a potentially expensive operation, as it requires
> > > >      * to read the messages from storage.
> > > >      * This function takes into consideration batch messages
> > > >      * and also Subscription filters.
> > > >      * @param topic
> > > >      *            Topic name
> > > >      * @param subscriptionName
> > > >      *            the subscription
> > > >      * @return an accurate analysis of the backlog
> > > >      * @throws PulsarAdminException
> > > >      *            Unexpected error
> > > >      */
> > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > analiseSubscriptionBacklogAsync(String topic,
> > > >
> > > >                  String subscriptionName);
> > > >
> > > > A pulsar-admin command will be added as well as usual.
> > > >
> > > > New configuration entries in broker.conf:
> > > >
> > > > @FieldContext(
> > > >          category = CATEGORY_POLICIES,
> > > >          doc = "Maximum time to spend while scanning a subscription
> to
> > > > calculate the accurate backlog"
> > > >  )
> > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > >  @FieldContext(
> > > >          category = CATEGORY_POLICIES,
> > > >          doc = "Maximum number of entries to process while scanning a
> > > > subscription to calculate the accurate backlog"
> > > >  )
> > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > >
> > > > Implementation
> > > >
> > > > The implementation is pretty straightforward:
> > > >
> > > > add a new API in ManagedCursor to do the Scan
> > > > add the REST API
> > > > implement in PersistentSubscription a analiseBacklog method that does
> > the
> > > > scan
> > > >
> > > > The the PersistentSubscription runs the scan:
> > > >
> > > > it applies the filters if they are present
> > > > it considers individuallyDeletedMessages
> > > >
> > > > Non trivial problem regarding the Dispatcher:
> > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > PersistentSubscription starts a Dispatcher only when the first
> > > > consumer is connecter.
> > > > This happens because the Subscription itself doesn't have a type
> > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> > > > is decided by the first consumer coming (after the load of the topic,
> > > > so the subscription type may change after a topic unload).
> > > > This PIP won't fix this "problem", and so in case of missing
> > > > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > > > Maybe in the future it will be better to persist the Subscription
> Type
> > > > and other metadata, this way we can create the Dispatcher while
> > > > instantiating the Subscription.
> > > >
> > > > Reject Alternatives
> > > >
> > > > We could store somehow some counter about the number of logical
> > > > messages during writes. But that does not work for a few reasons:
> > > >
> > > > you cannot know which subscriptions will be created in a topic
> > > > subscription can be created from the past (Earliest)
> > > > subscription filters may change over time: they are usually
> configured
> > > > using Subscription Properties, and those properties are dynamic
> > > > doing computations on the write path (like running filters) kills
> > > > latency and thoughtput
> > > >
> > > > Use a client to clone the subscription and consume data.
> > > > This doesn't work because you have to transfer the data to the
> client,
> > > > and this is possibly a huge amount of work and a waste of resources.
> > > >
> >
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Asaf Mesika <as...@gmail.com>.
On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli <eo...@gmail.com> wrote:

> Asaf,
>
> Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> <as...@gmail.com> ha scritto:
> >
> > I'm not sure I understand the context exactly:
> >
> > You say today we can only know the number of entries, hence we'll have a
> > wrong number of backlog for subscription since:
> > 1. One entry contains multiple messages (batch message)
> > 2. Subscription may contain a filter, which requires you to read the
> entire
> > backlog to know it
>
> correct
>
> >
> > There are two things I don't understand:
> >
> > 1. We're adding an observability API, which you need to pay all the read
> > cost just to know the count. I presume people would want to run this more
> > than once. So they will read same data multiple times - why would a user
> be
> > willing to pay such a hefty price?
>
> sometimes it is the case, because processing a message may have a high
> cost.
> So having 10 entries of 100 messages is not correctly representing the
> amount of work that must be done by the consumers
> and so the user may wish to have an exact count.
>
> Having the filters adds more complexity because you cannot predict how
> many entries will be filtered out
>
>
> So it's mainly serving that specific use case of reading the entire
messages over and over (every interval) is an order of magnitude less
expensive than the processing it self.


> > 2. If the user needs to know an accurate backlog, can't they use the
> > ability to create a very large number of topics, thus they will know an
> > accurate backlog without the huge cost?
>
> I can't understand why creating many topics will help.
> instead with filters it is very likely that you have only fewer topics
> with many subscriptions with different filters
>
> as you don't know the filters while writing you cannot route the
> messages to some topic
> also you would need to write the message to potentially multiple
> topics, and that would be a huge write amplification
> (think about a topic with 100 subscriptions)
>
> Yes, I haven't thought about that.
What I was thinking is that those filters are mutually exclusive therefor
topics, but in your case, if you have 100 different filters, and they
overlap, yes it would be way more expensive to write them 100 times.

>
> > I have an idea, if that's ok:
> >
> > What if you can keep, as you said in your document, a metric counting
> > messages per filter upon write.
> This is not possible as described above
>

You wrote above that:

---
you cannot know which subscriptions will be created in a topic
subscription can be created from the past (Earliest)
subscription filters may change over time: they are usually configured
using Subscription Properties, and those properties are dynamic
doing computations on the write path (like running filters) kills
latency and thoughtput

Use a client to clone the subscription and consume data.
This doesn't work because you have to transfer the data to the client,
and this is possibly a huge amount of work and a waste of resources.
---

What if we don't do it directly on the write path.
What if the topic owner creates an internal subscription, consumes the
messages, and updates a count per filter.
Thus, those computation will have less effect directly on the write path.

I'm trying to compare that cost of compuations, with consuming all the
messages, again and again, running filter computation for them, every
interval (say 1min).
The amount of computation in the latter would be more costly, no?


> When you update the filter / add a filter
> > by adding a new subscription, you can run code that reads from the
> > beginning of the subscription (first unacked message) to catch up and
> then
> > continues. This may be done async, so the metric will take some time to
> > catch up.
> > Amortized, it has less cost on the system overall, if compared to reading
> > all the messages multiple times to get a period size of the subscription.
> > Both solutions are expensive as opposed to nothing of course. Both has to
> > be a well documented conscious choice.
> > WDYT?
>
>
> Enrico
> >
> > Asaf
> >
> >
> > On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <eo...@gmail.com>
> > wrote:
> >
> > > Hello,
> > > this is a PIP to implement a tool to analyse the subscription backlog
> > >
> > > Link: https://github.com/apache/pulsar/issues/16597
> > > Prototype: https://github.com/apache/pulsar/pull/16545
> > >
> > > Below you can find the proposal (I will amend the GH issue while we
> > > discuss, as usual)
> > >
> > > Enrico
> > >
> > > Motivation
> > >
> > > Currently there is no way to have a accurate backlog for a
> subscription:
> > >
> > > you have only the number of "entries", not messages
> > > server side filters (PIP-105) may filter out some messages
> > >
> > > Having the number of entries is sometimes not enough because with
> > > batch messages the amount of work on the Consumers is proportional to
> > > the number of messages, that may vary from entry to entry.
> > >
> > > Goal
> > >
> > > The idea of this patch is to provide a dedicate API (REST,
> > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > > provide detailed information about that is expected to be delivered to
> > > Consumers.
> > >
> > > The operation will be quite expensive because we have to load the
> > > messages from storage and pass them to the filters, but due to the
> > > dynamic nature of Pulsar subscriptions there is no other way to have
> > > this value.
> > >
> > > One good strategy to do monitoring/alerting is to setup alerts on the
> > > usual "stats" and use this new API to inspect the subscription deeper,
> > > typically be issuing a manual command.
> > >
> > > API Changes
> > >
> > > internal ManagedCursor API:
> > >
> > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > > maxEntries, long timeOutMs);
> > >
> > > This method scans the Cursor from the lastMarkDelete position to the
> tail.
> > > There is a time limit and a maxEntries limit, these are needed in
> > > order to prevent huge (and useless) scans.
> > > The Predicate can stop the scan, if it doesn't want to continue the
> > > processing for some reasons.
> > >
> > > New REST API:
> > >
> > >     @GET
> > >
> > >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > >     @ApiOperation(value = "Analyse a subscription, by scanning all the
> > > unprocessed messages")
> > >
> > >     public void analiseSubscriptionBacklog(
> > >            @Suspended final AsyncResponse asyncResponse,
> > >             @ApiParam(value = "Specify the tenant", required = true)
> > >             @PathParam("tenant") String tenant,
> > >             @ApiParam(value = "Specify the namespace", required = true)
> > >             @PathParam("namespace") String namespace,
> > >             @ApiParam(value = "Specify topic name", required = true)
> > >             @PathParam("topic") @Encoded String encodedTopic,
> > >             @ApiParam(value = "Subscription", required = true)
> > >             @PathParam("subName") String encodedSubName,
> > >             @ApiParam(value = "Is authentication required to perform
> > > this operation")
> > >             @QueryParam("authoritative") @DefaultValue("false")
> > > boolean authoritative) {
> > >
> > > API response model:
> > >
> > > public class AnaliseSubscriptionBacklogResult {
> > >     private long entries;
> > >     private long messages;
> > >
> > >     private long filterRejectedEntries;
> > >     private long filterAcceptedEntries;
> > >     private long filterRescheduledEntries;
> > >
> > >     private long filterRejectedMessages;
> > >     private long filterAcceptedMessages;
> > >     private long filterRescheduledMessages;
> > >
> > >     private boolean aborted;
> > >
> > > The response contains "aborted=true" is the request has been aborted
> > > by some internal limitations, like a timeout or the scan hit the max
> > > number of entries.
> > > We are not going to provide more details about the reason of the stop.
> > > It will make the API too detailed and harder to maintain. Also, in the
> > > logs of the broker you will find the details.
> > >
> > > New PulsarAdmin API:
> > >
> > > /**
> > >      * Analise subscription backlog.
> > >      * This is a potentially expensive operation, as it requires
> > >      * to read the messages from storage.
> > >      * This function takes into consideration batch messages
> > >      * and also Subscription filters.
> > >      * @param topic
> > >      *            Topic name
> > >      * @param subscriptionName
> > >      *            the subscription
> > >      * @return an accurate analysis of the backlog
> > >      * @throws PulsarAdminException
> > >      *            Unexpected error
> > >      */
> > >     AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
> > > topic, String subscriptionName)
> > >             throws PulsarAdminException;
> > >
> > >     /**
> > >      * Analise subscription backlog.
> > >      * This is a potentially expensive operation, as it requires
> > >      * to read the messages from storage.
> > >      * This function takes into consideration batch messages
> > >      * and also Subscription filters.
> > >      * @param topic
> > >      *            Topic name
> > >      * @param subscriptionName
> > >      *            the subscription
> > >      * @return an accurate analysis of the backlog
> > >      * @throws PulsarAdminException
> > >      *            Unexpected error
> > >      */
> > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > analiseSubscriptionBacklogAsync(String topic,
> > >
> > >                  String subscriptionName);
> > >
> > > A pulsar-admin command will be added as well as usual.
> > >
> > > New configuration entries in broker.conf:
> > >
> > > @FieldContext(
> > >          category = CATEGORY_POLICIES,
> > >          doc = "Maximum time to spend while scanning a subscription to
> > > calculate the accurate backlog"
> > >  )
> > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > >  @FieldContext(
> > >          category = CATEGORY_POLICIES,
> > >          doc = "Maximum number of entries to process while scanning a
> > > subscription to calculate the accurate backlog"
> > >  )
> > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > >
> > > Implementation
> > >
> > > The implementation is pretty straightforward:
> > >
> > > add a new API in ManagedCursor to do the Scan
> > > add the REST API
> > > implement in PersistentSubscription a analiseBacklog method that does
> the
> > > scan
> > >
> > > The the PersistentSubscription runs the scan:
> > >
> > > it applies the filters if they are present
> > > it considers individuallyDeletedMessages
> > >
> > > Non trivial problem regarding the Dispatcher:
> > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > PersistentSubscription starts a Dispatcher only when the first
> > > consumer is connecter.
> > > This happens because the Subscription itself doesn't have a type
> > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> > > is decided by the first consumer coming (after the load of the topic,
> > > so the subscription type may change after a topic unload).
> > > This PIP won't fix this "problem", and so in case of missing
> > > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > > Maybe in the future it will be better to persist the Subscription Type
> > > and other metadata, this way we can create the Dispatcher while
> > > instantiating the Subscription.
> > >
> > > Reject Alternatives
> > >
> > > We could store somehow some counter about the number of logical
> > > messages during writes. But that does not work for a few reasons:
> > >
> > > you cannot know which subscriptions will be created in a topic
> > > subscription can be created from the past (Earliest)
> > > subscription filters may change over time: they are usually configured
> > > using Subscription Properties, and those properties are dynamic
> > > doing computations on the write path (like running filters) kills
> > > latency and thoughtput
> > >
> > > Use a client to clone the subscription and consume data.
> > > This doesn't work because you have to transfer the data to the client,
> > > and this is possibly a huge amount of work and a waste of resources.
> > >
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Enrico Olivelli <eo...@gmail.com>.
Asaf,

Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
<as...@gmail.com> ha scritto:
>
> I'm not sure I understand the context exactly:
>
> You say today we can only know the number of entries, hence we'll have a
> wrong number of backlog for subscription since:
> 1. One entry contains multiple messages (batch message)
> 2. Subscription may contain a filter, which requires you to read the entire
> backlog to know it

correct

>
> There are two things I don't understand:
>
> 1. We're adding an observability API, which you need to pay all the read
> cost just to know the count. I presume people would want to run this more
> than once. So they will read same data multiple times - why would a user be
> willing to pay such a hefty price?

sometimes it is the case, because processing a message may have a high cost.
So having 10 entries of 100 messages is not correctly representing the
amount of work that must be done by the consumers
and so the user may wish to have an exact count.

Having the filters adds more complexity because you cannot predict how
many entries will be filtered out


> 2. If the user needs to know an accurate backlog, can't they use the
> ability to create a very large number of topics, thus they will know an
> accurate backlog without the huge cost?

I can't understand why creating many topics will help.
instead with filters it is very likely that you have only fewer topics
with many subscriptions with different filters

as you don't know the filters while writing you cannot route the
messages to some topic
also you would need to write the message to potentially multiple
topics, and that would be a huge write amplification
(think about a topic with 100 subscriptions)

>
> I have an idea, if that's ok:
>
> What if you can keep, as you said in your document, a metric counting
> messages per filter upon write.
This is not possible as described above

When you update the filter / add a filter
> by adding a new subscription, you can run code that reads from the
> beginning of the subscription (first unacked message) to catch up and then
> continues. This may be done async, so the metric will take some time to
> catch up.
> Amortized, it has less cost on the system overall, if compared to reading
> all the messages multiple times to get a period size of the subscription.
> Both solutions are expensive as opposed to nothing of course. Both has to
> be a well documented conscious choice.
> WDYT?


Enrico
>
> Asaf
>
>
> On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > Hello,
> > this is a PIP to implement a tool to analyse the subscription backlog
> >
> > Link: https://github.com/apache/pulsar/issues/16597
> > Prototype: https://github.com/apache/pulsar/pull/16545
> >
> > Below you can find the proposal (I will amend the GH issue while we
> > discuss, as usual)
> >
> > Enrico
> >
> > Motivation
> >
> > Currently there is no way to have a accurate backlog for a subscription:
> >
> > you have only the number of "entries", not messages
> > server side filters (PIP-105) may filter out some messages
> >
> > Having the number of entries is sometimes not enough because with
> > batch messages the amount of work on the Consumers is proportional to
> > the number of messages, that may vary from entry to entry.
> >
> > Goal
> >
> > The idea of this patch is to provide a dedicate API (REST,
> > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > provide detailed information about that is expected to be delivered to
> > Consumers.
> >
> > The operation will be quite expensive because we have to load the
> > messages from storage and pass them to the filters, but due to the
> > dynamic nature of Pulsar subscriptions there is no other way to have
> > this value.
> >
> > One good strategy to do monitoring/alerting is to setup alerts on the
> > usual "stats" and use this new API to inspect the subscription deeper,
> > typically be issuing a manual command.
> >
> > API Changes
> >
> > internal ManagedCursor API:
> >
> > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > maxEntries, long timeOutMs);
> >
> > This method scans the Cursor from the lastMarkDelete position to the tail.
> > There is a time limit and a maxEntries limit, these are needed in
> > order to prevent huge (and useless) scans.
> > The Predicate can stop the scan, if it doesn't want to continue the
> > processing for some reasons.
> >
> > New REST API:
> >
> >     @GET
> >
> > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> >     @ApiOperation(value = "Analyse a subscription, by scanning all the
> > unprocessed messages")
> >
> >     public void analiseSubscriptionBacklog(
> >            @Suspended final AsyncResponse asyncResponse,
> >             @ApiParam(value = "Specify the tenant", required = true)
> >             @PathParam("tenant") String tenant,
> >             @ApiParam(value = "Specify the namespace", required = true)
> >             @PathParam("namespace") String namespace,
> >             @ApiParam(value = "Specify topic name", required = true)
> >             @PathParam("topic") @Encoded String encodedTopic,
> >             @ApiParam(value = "Subscription", required = true)
> >             @PathParam("subName") String encodedSubName,
> >             @ApiParam(value = "Is authentication required to perform
> > this operation")
> >             @QueryParam("authoritative") @DefaultValue("false")
> > boolean authoritative) {
> >
> > API response model:
> >
> > public class AnaliseSubscriptionBacklogResult {
> >     private long entries;
> >     private long messages;
> >
> >     private long filterRejectedEntries;
> >     private long filterAcceptedEntries;
> >     private long filterRescheduledEntries;
> >
> >     private long filterRejectedMessages;
> >     private long filterAcceptedMessages;
> >     private long filterRescheduledMessages;
> >
> >     private boolean aborted;
> >
> > The response contains "aborted=true" is the request has been aborted
> > by some internal limitations, like a timeout or the scan hit the max
> > number of entries.
> > We are not going to provide more details about the reason of the stop.
> > It will make the API too detailed and harder to maintain. Also, in the
> > logs of the broker you will find the details.
> >
> > New PulsarAdmin API:
> >
> > /**
> >      * Analise subscription backlog.
> >      * This is a potentially expensive operation, as it requires
> >      * to read the messages from storage.
> >      * This function takes into consideration batch messages
> >      * and also Subscription filters.
> >      * @param topic
> >      *            Topic name
> >      * @param subscriptionName
> >      *            the subscription
> >      * @return an accurate analysis of the backlog
> >      * @throws PulsarAdminException
> >      *            Unexpected error
> >      */
> >     AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
> > topic, String subscriptionName)
> >             throws PulsarAdminException;
> >
> >     /**
> >      * Analise subscription backlog.
> >      * This is a potentially expensive operation, as it requires
> >      * to read the messages from storage.
> >      * This function takes into consideration batch messages
> >      * and also Subscription filters.
> >      * @param topic
> >      *            Topic name
> >      * @param subscriptionName
> >      *            the subscription
> >      * @return an accurate analysis of the backlog
> >      * @throws PulsarAdminException
> >      *            Unexpected error
> >      */
> >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > analiseSubscriptionBacklogAsync(String topic,
> >
> >                  String subscriptionName);
> >
> > A pulsar-admin command will be added as well as usual.
> >
> > New configuration entries in broker.conf:
> >
> > @FieldContext(
> >          category = CATEGORY_POLICIES,
> >          doc = "Maximum time to spend while scanning a subscription to
> > calculate the accurate backlog"
> >  )
> >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> >  @FieldContext(
> >          category = CATEGORY_POLICIES,
> >          doc = "Maximum number of entries to process while scanning a
> > subscription to calculate the accurate backlog"
> >  )
> >  private long subscriptionBacklogScanMaxEntries = 10_000;
> >
> > Implementation
> >
> > The implementation is pretty straightforward:
> >
> > add a new API in ManagedCursor to do the Scan
> > add the REST API
> > implement in PersistentSubscription a analiseBacklog method that does the
> > scan
> >
> > The the PersistentSubscription runs the scan:
> >
> > it applies the filters if they are present
> > it considers individuallyDeletedMessages
> >
> > Non trivial problem regarding the Dispatcher:
> > The Filters are loaded by a AbstractBaseDispatcher, but
> > PersistentSubscription starts a Dispatcher only when the first
> > consumer is connecter.
> > This happens because the Subscription itself doesn't have a type
> > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> > is decided by the first consumer coming (after the load of the topic,
> > so the subscription type may change after a topic unload).
> > This PIP won't fix this "problem", and so in case of missing
> > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > Maybe in the future it will be better to persist the Subscription Type
> > and other metadata, this way we can create the Dispatcher while
> > instantiating the Subscription.
> >
> > Reject Alternatives
> >
> > We could store somehow some counter about the number of logical
> > messages during writes. But that does not work for a few reasons:
> >
> > you cannot know which subscriptions will be created in a topic
> > subscription can be created from the past (Earliest)
> > subscription filters may change over time: they are usually configured
> > using Subscription Properties, and those properties are dynamic
> > doing computations on the write path (like running filters) kills
> > latency and thoughtput
> >
> > Use a client to clone the subscription and consume data.
> > This doesn't work because you have to transfer the data to the client,
> > and this is possibly a huge amount of work and a waste of resources.
> >

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Asaf Mesika <as...@gmail.com>.
I'm not sure I understand the context exactly:

You say today we can only know the number of entries, hence we'll have a
wrong number of backlog for subscription since:
1. One entry contains multiple messages (batch message)
2. Subscription may contain a filter, which requires you to read the entire
backlog to know it

There are two things I don't understand:

1. We're adding an observability API, which you need to pay all the read
cost just to know the count. I presume people would want to run this more
than once. So they will read same data multiple times - why would a user be
willing to pay such a hefty price?
2. If the user needs to know an accurate backlog, can't they use the
ability to create a very large number of topics, thus they will know an
accurate backlog without the huge cost?

I have an idea, if that's ok:

What if you can keep, as you said in your document, a metric counting
messages per filter upon write. When you update the filter / add a filter
by adding a new subscription, you can run code that reads from the
beginning of the subscription (first unacked message) to catch up and then
continues. This may be done async, so the metric will take some time to
catch up.
Amortized, it has less cost on the system overall, if compared to reading
all the messages multiple times to get a period size of the subscription.
Both solutions are expensive as opposed to nothing of course. Both has to
be a well documented conscious choice.
WDYT?

Asaf


On Thu, Jul 14, 2022 at 10:34 AM Enrico Olivelli <eo...@gmail.com>
wrote:

> Hello,
> this is a PIP to implement a tool to analyse the subscription backlog
>
> Link: https://github.com/apache/pulsar/issues/16597
> Prototype: https://github.com/apache/pulsar/pull/16545
>
> Below you can find the proposal (I will amend the GH issue while we
> discuss, as usual)
>
> Enrico
>
> Motivation
>
> Currently there is no way to have a accurate backlog for a subscription:
>
> you have only the number of "entries", not messages
> server side filters (PIP-105) may filter out some messages
>
> Having the number of entries is sometimes not enough because with
> batch messages the amount of work on the Consumers is proportional to
> the number of messages, that may vary from entry to entry.
>
> Goal
>
> The idea of this patch is to provide a dedicate API (REST,
> pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> provide detailed information about that is expected to be delivered to
> Consumers.
>
> The operation will be quite expensive because we have to load the
> messages from storage and pass them to the filters, but due to the
> dynamic nature of Pulsar subscriptions there is no other way to have
> this value.
>
> One good strategy to do monitoring/alerting is to setup alerts on the
> usual "stats" and use this new API to inspect the subscription deeper,
> typically be issuing a manual command.
>
> API Changes
>
> internal ManagedCursor API:
>
> CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> maxEntries, long timeOutMs);
>
> This method scans the Cursor from the lastMarkDelete position to the tail.
> There is a time limit and a maxEntries limit, these are needed in
> order to prevent huge (and useless) scans.
> The Predicate can stop the scan, if it doesn't want to continue the
> processing for some reasons.
>
> New REST API:
>
>     @GET
>
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
>     @ApiOperation(value = "Analyse a subscription, by scanning all the
> unprocessed messages")
>
>     public void analiseSubscriptionBacklog(
>            @Suspended final AsyncResponse asyncResponse,
>             @ApiParam(value = "Specify the tenant", required = true)
>             @PathParam("tenant") String tenant,
>             @ApiParam(value = "Specify the namespace", required = true)
>             @PathParam("namespace") String namespace,
>             @ApiParam(value = "Specify topic name", required = true)
>             @PathParam("topic") @Encoded String encodedTopic,
>             @ApiParam(value = "Subscription", required = true)
>             @PathParam("subName") String encodedSubName,
>             @ApiParam(value = "Is authentication required to perform
> this operation")
>             @QueryParam("authoritative") @DefaultValue("false")
> boolean authoritative) {
>
> API response model:
>
> public class AnaliseSubscriptionBacklogResult {
>     private long entries;
>     private long messages;
>
>     private long filterRejectedEntries;
>     private long filterAcceptedEntries;
>     private long filterRescheduledEntries;
>
>     private long filterRejectedMessages;
>     private long filterAcceptedMessages;
>     private long filterRescheduledMessages;
>
>     private boolean aborted;
>
> The response contains "aborted=true" is the request has been aborted
> by some internal limitations, like a timeout or the scan hit the max
> number of entries.
> We are not going to provide more details about the reason of the stop.
> It will make the API too detailed and harder to maintain. Also, in the
> logs of the broker you will find the details.
>
> New PulsarAdmin API:
>
> /**
>      * Analise subscription backlog.
>      * This is a potentially expensive operation, as it requires
>      * to read the messages from storage.
>      * This function takes into consideration batch messages
>      * and also Subscription filters.
>      * @param topic
>      *            Topic name
>      * @param subscriptionName
>      *            the subscription
>      * @return an accurate analysis of the backlog
>      * @throws PulsarAdminException
>      *            Unexpected error
>      */
>     AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
> topic, String subscriptionName)
>             throws PulsarAdminException;
>
>     /**
>      * Analise subscription backlog.
>      * This is a potentially expensive operation, as it requires
>      * to read the messages from storage.
>      * This function takes into consideration batch messages
>      * and also Subscription filters.
>      * @param topic
>      *            Topic name
>      * @param subscriptionName
>      *            the subscription
>      * @return an accurate analysis of the backlog
>      * @throws PulsarAdminException
>      *            Unexpected error
>      */
>     CompletableFuture<AnaliseSubscriptionBacklogResult>
> analiseSubscriptionBacklogAsync(String topic,
>
>                  String subscriptionName);
>
> A pulsar-admin command will be added as well as usual.
>
> New configuration entries in broker.conf:
>
> @FieldContext(
>          category = CATEGORY_POLICIES,
>          doc = "Maximum time to spend while scanning a subscription to
> calculate the accurate backlog"
>  )
>  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
>  @FieldContext(
>          category = CATEGORY_POLICIES,
>          doc = "Maximum number of entries to process while scanning a
> subscription to calculate the accurate backlog"
>  )
>  private long subscriptionBacklogScanMaxEntries = 10_000;
>
> Implementation
>
> The implementation is pretty straightforward:
>
> add a new API in ManagedCursor to do the Scan
> add the REST API
> implement in PersistentSubscription a analiseBacklog method that does the
> scan
>
> The the PersistentSubscription runs the scan:
>
> it applies the filters if they are present
> it considers individuallyDeletedMessages
>
> Non trivial problem regarding the Dispatcher:
> The Filters are loaded by a AbstractBaseDispatcher, but
> PersistentSubscription starts a Dispatcher only when the first
> consumer is connecter.
> This happens because the Subscription itself doesn't have a type
> (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> is decided by the first consumer coming (after the load of the topic,
> so the subscription type may change after a topic unload).
> This PIP won't fix this "problem", and so in case of missing
> Dispatcher we are going to use a ephemeral Dispatcher without Type.
> Maybe in the future it will be better to persist the Subscription Type
> and other metadata, this way we can create the Dispatcher while
> instantiating the Subscription.
>
> Reject Alternatives
>
> We could store somehow some counter about the number of logical
> messages during writes. But that does not work for a few reasons:
>
> you cannot know which subscriptions will be created in a topic
> subscription can be created from the past (Earliest)
> subscription filters may change over time: they are usually configured
> using Subscription Properties, and those properties are dynamic
> doing computations on the write path (like running filters) kills
> latency and thoughtput
>
> Use a client to clone the subscription and consume data.
> This doesn't work because you have to transfer the data to the client,
> and this is possibly a huge amount of work and a waste of resources.
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Enrico Olivelli <eo...@gmail.com>.
Thank you all

Let me start a official VOTE thread

Enrico

Il Mar 19 Lug 2022, 15:10 mattison chao <ma...@apache.org> ha
scritto:

> +1,
> This feature is very useful for users.
>
> Best,
> Mattison
>
> On Mon, 18 Jul 2022 at 18:11, Nicolò Boschi <bo...@gmail.com> wrote:
>
> > +1, good work, I think it's very valuable for users for monitoring
> > purposes.
> >
> > Nicolò Boschi
> >
> >
> > Il giorno lun 18 lug 2022 alle ore 11:47 Lothruin Mirwen <
> > lothruin.mirwen@gmail.com> ha scritto:
> >
> > > This should be really helpful to monitor real backlog in production
> > > environments.
> > > Currently we are struck at "entries" count but doesn't describe well
> how
> > > much real message lag we have.
> > >
> > > +1 excited to see it work on our production environments
> > >
> > > Diego Salvi
> > >
> > > Il giorno lun 18 lug 2022 alle ore 11:18 Enrico Olivelli <
> > > eolivelli@gmail.com> ha scritto:
> > >
> > > > Any comments ?
> > > >
> > > > FYI I have updated the PIP after addressing some feedback on the PR:
> > > > - no more need to create a dummy Dispatcher
> > > > - now we are reading entries in batches
> > > >
> > > > I would like to see this in 2.11 please
> > > >
> > > > Enrico
> > > >
> > > > Il giorno gio 14 lug 2022 alle ore 09:34 Enrico Olivelli
> > > > <eo...@gmail.com> ha scritto:
> > > > >
> > > > > Hello,
> > > > > this is a PIP to implement a tool to analyse the subscription
> backlog
> > > > >
> > > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > > >
> > > > > Below you can find the proposal (I will amend the GH issue while we
> > > > > discuss, as usual)
> > > > >
> > > > > Enrico
> > > > >
> > > > > Motivation
> > > > >
> > > > > Currently there is no way to have a accurate backlog for a
> > > subscription:
> > > > >
> > > > > you have only the number of "entries", not messages
> > > > > server side filters (PIP-105) may filter out some messages
> > > > >
> > > > > Having the number of entries is sometimes not enough because with
> > > > > batch messages the amount of work on the Consumers is proportional
> to
> > > > > the number of messages, that may vary from entry to entry.
> > > > >
> > > > > Goal
> > > > >
> > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > > > > provide detailed information about that is expected to be delivered
> > to
> > > > > Consumers.
> > > > >
> > > > > The operation will be quite expensive because we have to load the
> > > > > messages from storage and pass them to the filters, but due to the
> > > > > dynamic nature of Pulsar subscriptions there is no other way to
> have
> > > > > this value.
> > > > >
> > > > > One good strategy to do monitoring/alerting is to setup alerts on
> the
> > > > > usual "stats" and use this new API to inspect the subscription
> > deeper,
> > > > > typically be issuing a manual command.
> > > > >
> > > > > API Changes
> > > > >
> > > > > internal ManagedCursor API:
> > > > >
> > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> long
> > > > > maxEntries, long timeOutMs);
> > > > >
> > > > > This method scans the Cursor from the lastMarkDelete position to
> the
> > > > tail.
> > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > order to prevent huge (and useless) scans.
> > > > > The Predicate can stop the scan, if it doesn't want to continue the
> > > > > processing for some reasons.
> > > > >
> > > > > New REST API:
> > > > >
> > > > >     @GET
> > > > >
> > > >
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > > >     @ApiOperation(value = "Analyse a subscription, by scanning all
> > the
> > > > > unprocessed messages")
> > > > >
> > > > >     public void analiseSubscriptionBacklog(
> > > > >            @Suspended final AsyncResponse asyncResponse,
> > > > >             @ApiParam(value = "Specify the tenant", required =
> true)
> > > > >             @PathParam("tenant") String tenant,
> > > > >             @ApiParam(value = "Specify the namespace", required =
> > true)
> > > > >             @PathParam("namespace") String namespace,
> > > > >             @ApiParam(value = "Specify topic name", required =
> true)
> > > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > > >             @ApiParam(value = "Subscription", required = true)
> > > > >             @PathParam("subName") String encodedSubName,
> > > > >             @ApiParam(value = "Is authentication required to
> perform
> > > > > this operation")
> > > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > > boolean authoritative) {
> > > > >
> > > > > API response model:
> > > > >
> > > > > public class AnaliseSubscriptionBacklogResult {
> > > > >     private long entries;
> > > > >     private long messages;
> > > > >
> > > > >     private long filterRejectedEntries;
> > > > >     private long filterAcceptedEntries;
> > > > >     private long filterRescheduledEntries;
> > > > >
> > > > >     private long filterRejectedMessages;
> > > > >     private long filterAcceptedMessages;
> > > > >     private long filterRescheduledMessages;
> > > > >
> > > > >     private boolean aborted;
> > > > >
> > > > > The response contains "aborted=true" is the request has been
> aborted
> > > > > by some internal limitations, like a timeout or the scan hit the
> max
> > > > > number of entries.
> > > > > We are not going to provide more details about the reason of the
> > stop.
> > > > > It will make the API too detailed and harder to maintain. Also, in
> > the
> > > > > logs of the broker you will find the details.
> > > > >
> > > > > New PulsarAdmin API:
> > > > >
> > > > > /**
> > > > >      * Analise subscription backlog.
> > > > >      * This is a potentially expensive operation, as it requires
> > > > >      * to read the messages from storage.
> > > > >      * This function takes into consideration batch messages
> > > > >      * and also Subscription filters.
> > > > >      * @param topic
> > > > >      *            Topic name
> > > > >      * @param subscriptionName
> > > > >      *            the subscription
> > > > >      * @return an accurate analysis of the backlog
> > > > >      * @throws PulsarAdminException
> > > > >      *            Unexpected error
> > > > >      */
> > > > >     AnaliseSubscriptionBacklogResult
> > analiseSubscriptionBacklog(String
> > > > > topic, String subscriptionName)
> > > > >             throws PulsarAdminException;
> > > > >
> > > > >     /**
> > > > >      * Analise subscription backlog.
> > > > >      * This is a potentially expensive operation, as it requires
> > > > >      * to read the messages from storage.
> > > > >      * This function takes into consideration batch messages
> > > > >      * and also Subscription filters.
> > > > >      * @param topic
> > > > >      *            Topic name
> > > > >      * @param subscriptionName
> > > > >      *            the subscription
> > > > >      * @return an accurate analysis of the backlog
> > > > >      * @throws PulsarAdminException
> > > > >      *            Unexpected error
> > > > >      */
> > > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > >
> > > > >                  String subscriptionName);
> > > > >
> > > > > A pulsar-admin command will be added as well as usual.
> > > > >
> > > > > New configuration entries in broker.conf:
> > > > >
> > > > > @FieldContext(
> > > > >          category = CATEGORY_POLICIES,
> > > > >          doc = "Maximum time to spend while scanning a subscription
> > to
> > > > > calculate the accurate backlog"
> > > > >  )
> > > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > >  @FieldContext(
> > > > >          category = CATEGORY_POLICIES,
> > > > >          doc = "Maximum number of entries to process while
> scanning a
> > > > > subscription to calculate the accurate backlog"
> > > > >  )
> > > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > >
> > > > > Implementation
> > > > >
> > > > > The implementation is pretty straightforward:
> > > > >
> > > > > add a new API in ManagedCursor to do the Scan
> > > > > add the REST API
> > > > > implement in PersistentSubscription a analiseBacklog method that
> does
> > > > the scan
> > > > >
> > > > > The the PersistentSubscription runs the scan:
> > > > >
> > > > > it applies the filters if they are present
> > > > > it considers individuallyDeletedMessages
> > > > >
> > > > > Non trivial problem regarding the Dispatcher:
> > > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > > PersistentSubscription starts a Dispatcher only when the first
> > > > > consumer is connecter.
> > > > > This happens because the Subscription itself doesn't have a type
> > > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this
> stuff
> > > > > is decided by the first consumer coming (after the load of the
> topic,
> > > > > so the subscription type may change after a topic unload).
> > > > > This PIP won't fix this "problem", and so in case of missing
> > > > > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > > > > Maybe in the future it will be better to persist the Subscription
> > Type
> > > > > and other metadata, this way we can create the Dispatcher while
> > > > > instantiating the Subscription.
> > > > >
> > > > > Reject Alternatives
> > > > >
> > > > > We could store somehow some counter about the number of logical
> > > > > messages during writes. But that does not work for a few reasons:
> > > > >
> > > > > you cannot know which subscriptions will be created in a topic
> > > > > subscription can be created from the past (Earliest)
> > > > > subscription filters may change over time: they are usually
> > configured
> > > > > using Subscription Properties, and those properties are dynamic
> > > > > doing computations on the write path (like running filters) kills
> > > > > latency and thoughtput
> > > > >
> > > > > Use a client to clone the subscription and consume data.
> > > > > This doesn't work because you have to transfer the data to the
> > client,
> > > > > and this is possibly a huge amount of work and a waste of
> resources.
> > > >
> > >
> >
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by mattison chao <ma...@apache.org>.
+1,
This feature is very useful for users.

Best,
Mattison

On Mon, 18 Jul 2022 at 18:11, Nicolò Boschi <bo...@gmail.com> wrote:

> +1, good work, I think it's very valuable for users for monitoring
> purposes.
>
> Nicolò Boschi
>
>
> Il giorno lun 18 lug 2022 alle ore 11:47 Lothruin Mirwen <
> lothruin.mirwen@gmail.com> ha scritto:
>
> > This should be really helpful to monitor real backlog in production
> > environments.
> > Currently we are struck at "entries" count but doesn't describe well how
> > much real message lag we have.
> >
> > +1 excited to see it work on our production environments
> >
> > Diego Salvi
> >
> > Il giorno lun 18 lug 2022 alle ore 11:18 Enrico Olivelli <
> > eolivelli@gmail.com> ha scritto:
> >
> > > Any comments ?
> > >
> > > FYI I have updated the PIP after addressing some feedback on the PR:
> > > - no more need to create a dummy Dispatcher
> > > - now we are reading entries in batches
> > >
> > > I would like to see this in 2.11 please
> > >
> > > Enrico
> > >
> > > Il giorno gio 14 lug 2022 alle ore 09:34 Enrico Olivelli
> > > <eo...@gmail.com> ha scritto:
> > > >
> > > > Hello,
> > > > this is a PIP to implement a tool to analyse the subscription backlog
> > > >
> > > > Link: https://github.com/apache/pulsar/issues/16597
> > > > Prototype: https://github.com/apache/pulsar/pull/16545
> > > >
> > > > Below you can find the proposal (I will amend the GH issue while we
> > > > discuss, as usual)
> > > >
> > > > Enrico
> > > >
> > > > Motivation
> > > >
> > > > Currently there is no way to have a accurate backlog for a
> > subscription:
> > > >
> > > > you have only the number of "entries", not messages
> > > > server side filters (PIP-105) may filter out some messages
> > > >
> > > > Having the number of entries is sometimes not enough because with
> > > > batch messages the amount of work on the Consumers is proportional to
> > > > the number of messages, that may vary from entry to entry.
> > > >
> > > > Goal
> > > >
> > > > The idea of this patch is to provide a dedicate API (REST,
> > > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > > > provide detailed information about that is expected to be delivered
> to
> > > > Consumers.
> > > >
> > > > The operation will be quite expensive because we have to load the
> > > > messages from storage and pass them to the filters, but due to the
> > > > dynamic nature of Pulsar subscriptions there is no other way to have
> > > > this value.
> > > >
> > > > One good strategy to do monitoring/alerting is to setup alerts on the
> > > > usual "stats" and use this new API to inspect the subscription
> deeper,
> > > > typically be issuing a manual command.
> > > >
> > > > API Changes
> > > >
> > > > internal ManagedCursor API:
> > > >
> > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > > > maxEntries, long timeOutMs);
> > > >
> > > > This method scans the Cursor from the lastMarkDelete position to the
> > > tail.
> > > > There is a time limit and a maxEntries limit, these are needed in
> > > > order to prevent huge (and useless) scans.
> > > > The Predicate can stop the scan, if it doesn't want to continue the
> > > > processing for some reasons.
> > > >
> > > > New REST API:
> > > >
> > > >     @GET
> > > >
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > > >     @ApiOperation(value = "Analyse a subscription, by scanning all
> the
> > > > unprocessed messages")
> > > >
> > > >     public void analiseSubscriptionBacklog(
> > > >            @Suspended final AsyncResponse asyncResponse,
> > > >             @ApiParam(value = "Specify the tenant", required = true)
> > > >             @PathParam("tenant") String tenant,
> > > >             @ApiParam(value = "Specify the namespace", required =
> true)
> > > >             @PathParam("namespace") String namespace,
> > > >             @ApiParam(value = "Specify topic name", required = true)
> > > >             @PathParam("topic") @Encoded String encodedTopic,
> > > >             @ApiParam(value = "Subscription", required = true)
> > > >             @PathParam("subName") String encodedSubName,
> > > >             @ApiParam(value = "Is authentication required to perform
> > > > this operation")
> > > >             @QueryParam("authoritative") @DefaultValue("false")
> > > > boolean authoritative) {
> > > >
> > > > API response model:
> > > >
> > > > public class AnaliseSubscriptionBacklogResult {
> > > >     private long entries;
> > > >     private long messages;
> > > >
> > > >     private long filterRejectedEntries;
> > > >     private long filterAcceptedEntries;
> > > >     private long filterRescheduledEntries;
> > > >
> > > >     private long filterRejectedMessages;
> > > >     private long filterAcceptedMessages;
> > > >     private long filterRescheduledMessages;
> > > >
> > > >     private boolean aborted;
> > > >
> > > > The response contains "aborted=true" is the request has been aborted
> > > > by some internal limitations, like a timeout or the scan hit the max
> > > > number of entries.
> > > > We are not going to provide more details about the reason of the
> stop.
> > > > It will make the API too detailed and harder to maintain. Also, in
> the
> > > > logs of the broker you will find the details.
> > > >
> > > > New PulsarAdmin API:
> > > >
> > > > /**
> > > >      * Analise subscription backlog.
> > > >      * This is a potentially expensive operation, as it requires
> > > >      * to read the messages from storage.
> > > >      * This function takes into consideration batch messages
> > > >      * and also Subscription filters.
> > > >      * @param topic
> > > >      *            Topic name
> > > >      * @param subscriptionName
> > > >      *            the subscription
> > > >      * @return an accurate analysis of the backlog
> > > >      * @throws PulsarAdminException
> > > >      *            Unexpected error
> > > >      */
> > > >     AnaliseSubscriptionBacklogResult
> analiseSubscriptionBacklog(String
> > > > topic, String subscriptionName)
> > > >             throws PulsarAdminException;
> > > >
> > > >     /**
> > > >      * Analise subscription backlog.
> > > >      * This is a potentially expensive operation, as it requires
> > > >      * to read the messages from storage.
> > > >      * This function takes into consideration batch messages
> > > >      * and also Subscription filters.
> > > >      * @param topic
> > > >      *            Topic name
> > > >      * @param subscriptionName
> > > >      *            the subscription
> > > >      * @return an accurate analysis of the backlog
> > > >      * @throws PulsarAdminException
> > > >      *            Unexpected error
> > > >      */
> > > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > analiseSubscriptionBacklogAsync(String topic,
> > > >
> > > >                  String subscriptionName);
> > > >
> > > > A pulsar-admin command will be added as well as usual.
> > > >
> > > > New configuration entries in broker.conf:
> > > >
> > > > @FieldContext(
> > > >          category = CATEGORY_POLICIES,
> > > >          doc = "Maximum time to spend while scanning a subscription
> to
> > > > calculate the accurate backlog"
> > > >  )
> > > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > >  @FieldContext(
> > > >          category = CATEGORY_POLICIES,
> > > >          doc = "Maximum number of entries to process while scanning a
> > > > subscription to calculate the accurate backlog"
> > > >  )
> > > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > > >
> > > > Implementation
> > > >
> > > > The implementation is pretty straightforward:
> > > >
> > > > add a new API in ManagedCursor to do the Scan
> > > > add the REST API
> > > > implement in PersistentSubscription a analiseBacklog method that does
> > > the scan
> > > >
> > > > The the PersistentSubscription runs the scan:
> > > >
> > > > it applies the filters if they are present
> > > > it considers individuallyDeletedMessages
> > > >
> > > > Non trivial problem regarding the Dispatcher:
> > > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > > PersistentSubscription starts a Dispatcher only when the first
> > > > consumer is connecter.
> > > > This happens because the Subscription itself doesn't have a type
> > > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> > > > is decided by the first consumer coming (after the load of the topic,
> > > > so the subscription type may change after a topic unload).
> > > > This PIP won't fix this "problem", and so in case of missing
> > > > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > > > Maybe in the future it will be better to persist the Subscription
> Type
> > > > and other metadata, this way we can create the Dispatcher while
> > > > instantiating the Subscription.
> > > >
> > > > Reject Alternatives
> > > >
> > > > We could store somehow some counter about the number of logical
> > > > messages during writes. But that does not work for a few reasons:
> > > >
> > > > you cannot know which subscriptions will be created in a topic
> > > > subscription can be created from the past (Earliest)
> > > > subscription filters may change over time: they are usually
> configured
> > > > using Subscription Properties, and those properties are dynamic
> > > > doing computations on the write path (like running filters) kills
> > > > latency and thoughtput
> > > >
> > > > Use a client to clone the subscription and consume data.
> > > > This doesn't work because you have to transfer the data to the
> client,
> > > > and this is possibly a huge amount of work and a waste of resources.
> > >
> >
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Nicolò Boschi <bo...@gmail.com>.
+1, good work, I think it's very valuable for users for monitoring
purposes.

Nicolò Boschi


Il giorno lun 18 lug 2022 alle ore 11:47 Lothruin Mirwen <
lothruin.mirwen@gmail.com> ha scritto:

> This should be really helpful to monitor real backlog in production
> environments.
> Currently we are struck at "entries" count but doesn't describe well how
> much real message lag we have.
>
> +1 excited to see it work on our production environments
>
> Diego Salvi
>
> Il giorno lun 18 lug 2022 alle ore 11:18 Enrico Olivelli <
> eolivelli@gmail.com> ha scritto:
>
> > Any comments ?
> >
> > FYI I have updated the PIP after addressing some feedback on the PR:
> > - no more need to create a dummy Dispatcher
> > - now we are reading entries in batches
> >
> > I would like to see this in 2.11 please
> >
> > Enrico
> >
> > Il giorno gio 14 lug 2022 alle ore 09:34 Enrico Olivelli
> > <eo...@gmail.com> ha scritto:
> > >
> > > Hello,
> > > this is a PIP to implement a tool to analyse the subscription backlog
> > >
> > > Link: https://github.com/apache/pulsar/issues/16597
> > > Prototype: https://github.com/apache/pulsar/pull/16545
> > >
> > > Below you can find the proposal (I will amend the GH issue while we
> > > discuss, as usual)
> > >
> > > Enrico
> > >
> > > Motivation
> > >
> > > Currently there is no way to have a accurate backlog for a
> subscription:
> > >
> > > you have only the number of "entries", not messages
> > > server side filters (PIP-105) may filter out some messages
> > >
> > > Having the number of entries is sometimes not enough because with
> > > batch messages the amount of work on the Consumers is proportional to
> > > the number of messages, that may vary from entry to entry.
> > >
> > > Goal
> > >
> > > The idea of this patch is to provide a dedicate API (REST,
> > > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > > provide detailed information about that is expected to be delivered to
> > > Consumers.
> > >
> > > The operation will be quite expensive because we have to load the
> > > messages from storage and pass them to the filters, but due to the
> > > dynamic nature of Pulsar subscriptions there is no other way to have
> > > this value.
> > >
> > > One good strategy to do monitoring/alerting is to setup alerts on the
> > > usual "stats" and use this new API to inspect the subscription deeper,
> > > typically be issuing a manual command.
> > >
> > > API Changes
> > >
> > > internal ManagedCursor API:
> > >
> > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > > maxEntries, long timeOutMs);
> > >
> > > This method scans the Cursor from the lastMarkDelete position to the
> > tail.
> > > There is a time limit and a maxEntries limit, these are needed in
> > > order to prevent huge (and useless) scans.
> > > The Predicate can stop the scan, if it doesn't want to continue the
> > > processing for some reasons.
> > >
> > > New REST API:
> > >
> > >     @GET
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> > >     @ApiOperation(value = "Analyse a subscription, by scanning all the
> > > unprocessed messages")
> > >
> > >     public void analiseSubscriptionBacklog(
> > >            @Suspended final AsyncResponse asyncResponse,
> > >             @ApiParam(value = "Specify the tenant", required = true)
> > >             @PathParam("tenant") String tenant,
> > >             @ApiParam(value = "Specify the namespace", required = true)
> > >             @PathParam("namespace") String namespace,
> > >             @ApiParam(value = "Specify topic name", required = true)
> > >             @PathParam("topic") @Encoded String encodedTopic,
> > >             @ApiParam(value = "Subscription", required = true)
> > >             @PathParam("subName") String encodedSubName,
> > >             @ApiParam(value = "Is authentication required to perform
> > > this operation")
> > >             @QueryParam("authoritative") @DefaultValue("false")
> > > boolean authoritative) {
> > >
> > > API response model:
> > >
> > > public class AnaliseSubscriptionBacklogResult {
> > >     private long entries;
> > >     private long messages;
> > >
> > >     private long filterRejectedEntries;
> > >     private long filterAcceptedEntries;
> > >     private long filterRescheduledEntries;
> > >
> > >     private long filterRejectedMessages;
> > >     private long filterAcceptedMessages;
> > >     private long filterRescheduledMessages;
> > >
> > >     private boolean aborted;
> > >
> > > The response contains "aborted=true" is the request has been aborted
> > > by some internal limitations, like a timeout or the scan hit the max
> > > number of entries.
> > > We are not going to provide more details about the reason of the stop.
> > > It will make the API too detailed and harder to maintain. Also, in the
> > > logs of the broker you will find the details.
> > >
> > > New PulsarAdmin API:
> > >
> > > /**
> > >      * Analise subscription backlog.
> > >      * This is a potentially expensive operation, as it requires
> > >      * to read the messages from storage.
> > >      * This function takes into consideration batch messages
> > >      * and also Subscription filters.
> > >      * @param topic
> > >      *            Topic name
> > >      * @param subscriptionName
> > >      *            the subscription
> > >      * @return an accurate analysis of the backlog
> > >      * @throws PulsarAdminException
> > >      *            Unexpected error
> > >      */
> > >     AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
> > > topic, String subscriptionName)
> > >             throws PulsarAdminException;
> > >
> > >     /**
> > >      * Analise subscription backlog.
> > >      * This is a potentially expensive operation, as it requires
> > >      * to read the messages from storage.
> > >      * This function takes into consideration batch messages
> > >      * and also Subscription filters.
> > >      * @param topic
> > >      *            Topic name
> > >      * @param subscriptionName
> > >      *            the subscription
> > >      * @return an accurate analysis of the backlog
> > >      * @throws PulsarAdminException
> > >      *            Unexpected error
> > >      */
> > >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > analiseSubscriptionBacklogAsync(String topic,
> > >
> > >                  String subscriptionName);
> > >
> > > A pulsar-admin command will be added as well as usual.
> > >
> > > New configuration entries in broker.conf:
> > >
> > > @FieldContext(
> > >          category = CATEGORY_POLICIES,
> > >          doc = "Maximum time to spend while scanning a subscription to
> > > calculate the accurate backlog"
> > >  )
> > >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > >  @FieldContext(
> > >          category = CATEGORY_POLICIES,
> > >          doc = "Maximum number of entries to process while scanning a
> > > subscription to calculate the accurate backlog"
> > >  )
> > >  private long subscriptionBacklogScanMaxEntries = 10_000;
> > >
> > > Implementation
> > >
> > > The implementation is pretty straightforward:
> > >
> > > add a new API in ManagedCursor to do the Scan
> > > add the REST API
> > > implement in PersistentSubscription a analiseBacklog method that does
> > the scan
> > >
> > > The the PersistentSubscription runs the scan:
> > >
> > > it applies the filters if they are present
> > > it considers individuallyDeletedMessages
> > >
> > > Non trivial problem regarding the Dispatcher:
> > > The Filters are loaded by a AbstractBaseDispatcher, but
> > > PersistentSubscription starts a Dispatcher only when the first
> > > consumer is connecter.
> > > This happens because the Subscription itself doesn't have a type
> > > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> > > is decided by the first consumer coming (after the load of the topic,
> > > so the subscription type may change after a topic unload).
> > > This PIP won't fix this "problem", and so in case of missing
> > > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > > Maybe in the future it will be better to persist the Subscription Type
> > > and other metadata, this way we can create the Dispatcher while
> > > instantiating the Subscription.
> > >
> > > Reject Alternatives
> > >
> > > We could store somehow some counter about the number of logical
> > > messages during writes. But that does not work for a few reasons:
> > >
> > > you cannot know which subscriptions will be created in a topic
> > > subscription can be created from the past (Earliest)
> > > subscription filters may change over time: they are usually configured
> > > using Subscription Properties, and those properties are dynamic
> > > doing computations on the write path (like running filters) kills
> > > latency and thoughtput
> > >
> > > Use a client to clone the subscription and consume data.
> > > This doesn't work because you have to transfer the data to the client,
> > > and this is possibly a huge amount of work and a waste of resources.
> >
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Lothruin Mirwen <lo...@gmail.com>.
This should be really helpful to monitor real backlog in production
environments.
Currently we are struck at "entries" count but doesn't describe well how
much real message lag we have.

+1 excited to see it work on our production environments

Diego Salvi

Il giorno lun 18 lug 2022 alle ore 11:18 Enrico Olivelli <
eolivelli@gmail.com> ha scritto:

> Any comments ?
>
> FYI I have updated the PIP after addressing some feedback on the PR:
> - no more need to create a dummy Dispatcher
> - now we are reading entries in batches
>
> I would like to see this in 2.11 please
>
> Enrico
>
> Il giorno gio 14 lug 2022 alle ore 09:34 Enrico Olivelli
> <eo...@gmail.com> ha scritto:
> >
> > Hello,
> > this is a PIP to implement a tool to analyse the subscription backlog
> >
> > Link: https://github.com/apache/pulsar/issues/16597
> > Prototype: https://github.com/apache/pulsar/pull/16545
> >
> > Below you can find the proposal (I will amend the GH issue while we
> > discuss, as usual)
> >
> > Enrico
> >
> > Motivation
> >
> > Currently there is no way to have a accurate backlog for a subscription:
> >
> > you have only the number of "entries", not messages
> > server side filters (PIP-105) may filter out some messages
> >
> > Having the number of entries is sometimes not enough because with
> > batch messages the amount of work on the Consumers is proportional to
> > the number of messages, that may vary from entry to entry.
> >
> > Goal
> >
> > The idea of this patch is to provide a dedicate API (REST,
> > pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> > provide detailed information about that is expected to be delivered to
> > Consumers.
> >
> > The operation will be quite expensive because we have to load the
> > messages from storage and pass them to the filters, but due to the
> > dynamic nature of Pulsar subscriptions there is no other way to have
> > this value.
> >
> > One good strategy to do monitoring/alerting is to setup alerts on the
> > usual "stats" and use this new API to inspect the subscription deeper,
> > typically be issuing a manual command.
> >
> > API Changes
> >
> > internal ManagedCursor API:
> >
> > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > maxEntries, long timeOutMs);
> >
> > This method scans the Cursor from the lastMarkDelete position to the
> tail.
> > There is a time limit and a maxEntries limit, these are needed in
> > order to prevent huge (and useless) scans.
> > The Predicate can stop the scan, if it doesn't want to continue the
> > processing for some reasons.
> >
> > New REST API:
> >
> >     @GET
> >
>  @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
> >     @ApiOperation(value = "Analyse a subscription, by scanning all the
> > unprocessed messages")
> >
> >     public void analiseSubscriptionBacklog(
> >            @Suspended final AsyncResponse asyncResponse,
> >             @ApiParam(value = "Specify the tenant", required = true)
> >             @PathParam("tenant") String tenant,
> >             @ApiParam(value = "Specify the namespace", required = true)
> >             @PathParam("namespace") String namespace,
> >             @ApiParam(value = "Specify topic name", required = true)
> >             @PathParam("topic") @Encoded String encodedTopic,
> >             @ApiParam(value = "Subscription", required = true)
> >             @PathParam("subName") String encodedSubName,
> >             @ApiParam(value = "Is authentication required to perform
> > this operation")
> >             @QueryParam("authoritative") @DefaultValue("false")
> > boolean authoritative) {
> >
> > API response model:
> >
> > public class AnaliseSubscriptionBacklogResult {
> >     private long entries;
> >     private long messages;
> >
> >     private long filterRejectedEntries;
> >     private long filterAcceptedEntries;
> >     private long filterRescheduledEntries;
> >
> >     private long filterRejectedMessages;
> >     private long filterAcceptedMessages;
> >     private long filterRescheduledMessages;
> >
> >     private boolean aborted;
> >
> > The response contains "aborted=true" is the request has been aborted
> > by some internal limitations, like a timeout or the scan hit the max
> > number of entries.
> > We are not going to provide more details about the reason of the stop.
> > It will make the API too detailed and harder to maintain. Also, in the
> > logs of the broker you will find the details.
> >
> > New PulsarAdmin API:
> >
> > /**
> >      * Analise subscription backlog.
> >      * This is a potentially expensive operation, as it requires
> >      * to read the messages from storage.
> >      * This function takes into consideration batch messages
> >      * and also Subscription filters.
> >      * @param topic
> >      *            Topic name
> >      * @param subscriptionName
> >      *            the subscription
> >      * @return an accurate analysis of the backlog
> >      * @throws PulsarAdminException
> >      *            Unexpected error
> >      */
> >     AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
> > topic, String subscriptionName)
> >             throws PulsarAdminException;
> >
> >     /**
> >      * Analise subscription backlog.
> >      * This is a potentially expensive operation, as it requires
> >      * to read the messages from storage.
> >      * This function takes into consideration batch messages
> >      * and also Subscription filters.
> >      * @param topic
> >      *            Topic name
> >      * @param subscriptionName
> >      *            the subscription
> >      * @return an accurate analysis of the backlog
> >      * @throws PulsarAdminException
> >      *            Unexpected error
> >      */
> >     CompletableFuture<AnaliseSubscriptionBacklogResult>
> > analiseSubscriptionBacklogAsync(String topic,
> >
> >                  String subscriptionName);
> >
> > A pulsar-admin command will be added as well as usual.
> >
> > New configuration entries in broker.conf:
> >
> > @FieldContext(
> >          category = CATEGORY_POLICIES,
> >          doc = "Maximum time to spend while scanning a subscription to
> > calculate the accurate backlog"
> >  )
> >  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> >  @FieldContext(
> >          category = CATEGORY_POLICIES,
> >          doc = "Maximum number of entries to process while scanning a
> > subscription to calculate the accurate backlog"
> >  )
> >  private long subscriptionBacklogScanMaxEntries = 10_000;
> >
> > Implementation
> >
> > The implementation is pretty straightforward:
> >
> > add a new API in ManagedCursor to do the Scan
> > add the REST API
> > implement in PersistentSubscription a analiseBacklog method that does
> the scan
> >
> > The the PersistentSubscription runs the scan:
> >
> > it applies the filters if they are present
> > it considers individuallyDeletedMessages
> >
> > Non trivial problem regarding the Dispatcher:
> > The Filters are loaded by a AbstractBaseDispatcher, but
> > PersistentSubscription starts a Dispatcher only when the first
> > consumer is connecter.
> > This happens because the Subscription itself doesn't have a type
> > (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> > is decided by the first consumer coming (after the load of the topic,
> > so the subscription type may change after a topic unload).
> > This PIP won't fix this "problem", and so in case of missing
> > Dispatcher we are going to use a ephemeral Dispatcher without Type.
> > Maybe in the future it will be better to persist the Subscription Type
> > and other metadata, this way we can create the Dispatcher while
> > instantiating the Subscription.
> >
> > Reject Alternatives
> >
> > We could store somehow some counter about the number of logical
> > messages during writes. But that does not work for a few reasons:
> >
> > you cannot know which subscriptions will be created in a topic
> > subscription can be created from the past (Earliest)
> > subscription filters may change over time: they are usually configured
> > using Subscription Properties, and those properties are dynamic
> > doing computations on the write path (like running filters) kills
> > latency and thoughtput
> >
> > Use a client to clone the subscription and consume data.
> > This doesn't work because you have to transfer the data to the client,
> > and this is possibly a huge amount of work and a waste of resources.
>

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

Posted by Enrico Olivelli <eo...@gmail.com>.
Any comments ?

FYI I have updated the PIP after addressing some feedback on the PR:
- no more need to create a dummy Dispatcher
- now we are reading entries in batches

I would like to see this in 2.11 please

Enrico

Il giorno gio 14 lug 2022 alle ore 09:34 Enrico Olivelli
<eo...@gmail.com> ha scritto:
>
> Hello,
> this is a PIP to implement a tool to analyse the subscription backlog
>
> Link: https://github.com/apache/pulsar/issues/16597
> Prototype: https://github.com/apache/pulsar/pull/16545
>
> Below you can find the proposal (I will amend the GH issue while we
> discuss, as usual)
>
> Enrico
>
> Motivation
>
> Currently there is no way to have a accurate backlog for a subscription:
>
> you have only the number of "entries", not messages
> server side filters (PIP-105) may filter out some messages
>
> Having the number of entries is sometimes not enough because with
> batch messages the amount of work on the Consumers is proportional to
> the number of messages, that may vary from entry to entry.
>
> Goal
>
> The idea of this patch is to provide a dedicate API (REST,
> pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and
> provide detailed information about that is expected to be delivered to
> Consumers.
>
> The operation will be quite expensive because we have to load the
> messages from storage and pass them to the filters, but due to the
> dynamic nature of Pulsar subscriptions there is no other way to have
> this value.
>
> One good strategy to do monitoring/alerting is to setup alerts on the
> usual "stats" and use this new API to inspect the subscription deeper,
> typically be issuing a manual command.
>
> API Changes
>
> internal ManagedCursor API:
>
> CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> maxEntries, long timeOutMs);
>
> This method scans the Cursor from the lastMarkDelete position to the tail.
> There is a time limit and a maxEntries limit, these are needed in
> order to prevent huge (and useless) scans.
> The Predicate can stop the scan, if it doesn't want to continue the
> processing for some reasons.
>
> New REST API:
>
>     @GET
>     @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
>     @ApiOperation(value = "Analyse a subscription, by scanning all the
> unprocessed messages")
>
>     public void analiseSubscriptionBacklog(
>            @Suspended final AsyncResponse asyncResponse,
>             @ApiParam(value = "Specify the tenant", required = true)
>             @PathParam("tenant") String tenant,
>             @ApiParam(value = "Specify the namespace", required = true)
>             @PathParam("namespace") String namespace,
>             @ApiParam(value = "Specify topic name", required = true)
>             @PathParam("topic") @Encoded String encodedTopic,
>             @ApiParam(value = "Subscription", required = true)
>             @PathParam("subName") String encodedSubName,
>             @ApiParam(value = "Is authentication required to perform
> this operation")
>             @QueryParam("authoritative") @DefaultValue("false")
> boolean authoritative) {
>
> API response model:
>
> public class AnaliseSubscriptionBacklogResult {
>     private long entries;
>     private long messages;
>
>     private long filterRejectedEntries;
>     private long filterAcceptedEntries;
>     private long filterRescheduledEntries;
>
>     private long filterRejectedMessages;
>     private long filterAcceptedMessages;
>     private long filterRescheduledMessages;
>
>     private boolean aborted;
>
> The response contains "aborted=true" is the request has been aborted
> by some internal limitations, like a timeout or the scan hit the max
> number of entries.
> We are not going to provide more details about the reason of the stop.
> It will make the API too detailed and harder to maintain. Also, in the
> logs of the broker you will find the details.
>
> New PulsarAdmin API:
>
> /**
>      * Analise subscription backlog.
>      * This is a potentially expensive operation, as it requires
>      * to read the messages from storage.
>      * This function takes into consideration batch messages
>      * and also Subscription filters.
>      * @param topic
>      *            Topic name
>      * @param subscriptionName
>      *            the subscription
>      * @return an accurate analysis of the backlog
>      * @throws PulsarAdminException
>      *            Unexpected error
>      */
>     AnaliseSubscriptionBacklogResult analiseSubscriptionBacklog(String
> topic, String subscriptionName)
>             throws PulsarAdminException;
>
>     /**
>      * Analise subscription backlog.
>      * This is a potentially expensive operation, as it requires
>      * to read the messages from storage.
>      * This function takes into consideration batch messages
>      * and also Subscription filters.
>      * @param topic
>      *            Topic name
>      * @param subscriptionName
>      *            the subscription
>      * @return an accurate analysis of the backlog
>      * @throws PulsarAdminException
>      *            Unexpected error
>      */
>     CompletableFuture<AnaliseSubscriptionBacklogResult>
> analiseSubscriptionBacklogAsync(String topic,
>
>                  String subscriptionName);
>
> A pulsar-admin command will be added as well as usual.
>
> New configuration entries in broker.conf:
>
> @FieldContext(
>          category = CATEGORY_POLICIES,
>          doc = "Maximum time to spend while scanning a subscription to
> calculate the accurate backlog"
>  )
>  private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
>  @FieldContext(
>          category = CATEGORY_POLICIES,
>          doc = "Maximum number of entries to process while scanning a
> subscription to calculate the accurate backlog"
>  )
>  private long subscriptionBacklogScanMaxEntries = 10_000;
>
> Implementation
>
> The implementation is pretty straightforward:
>
> add a new API in ManagedCursor to do the Scan
> add the REST API
> implement in PersistentSubscription a analiseBacklog method that does the scan
>
> The the PersistentSubscription runs the scan:
>
> it applies the filters if they are present
> it considers individuallyDeletedMessages
>
> Non trivial problem regarding the Dispatcher:
> The Filters are loaded by a AbstractBaseDispatcher, but
> PersistentSubscription starts a Dispatcher only when the first
> consumer is connecter.
> This happens because the Subscription itself doesn't have a type
> (Failover,Exclusive,Shared...) and KeySharedMetadata, all this stuff
> is decided by the first consumer coming (after the load of the topic,
> so the subscription type may change after a topic unload).
> This PIP won't fix this "problem", and so in case of missing
> Dispatcher we are going to use a ephemeral Dispatcher without Type.
> Maybe in the future it will be better to persist the Subscription Type
> and other metadata, this way we can create the Dispatcher while
> instantiating the Subscription.
>
> Reject Alternatives
>
> We could store somehow some counter about the number of logical
> messages during writes. But that does not work for a few reasons:
>
> you cannot know which subscriptions will be created in a topic
> subscription can be created from the past (Earliest)
> subscription filters may change over time: they are usually configured
> using Subscription Properties, and those properties are dynamic
> doing computations on the write path (like running filters) kills
> latency and thoughtput
>
> Use a client to clone the subscription and consume data.
> This doesn't work because you have to transfer the data to the client,
> and this is possibly a huge amount of work and a waste of resources.