You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Enrico Olivelli <eo...@gmail.com> on 2022/07/19 17:02:39 UTC

[VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

This is the VOTE thread for PIP-187

This is the GH issue: https://github.com/apache/pulsar/issues/16597
This is the PR: https://github.com/apache/pulsar/pull/16545

The vote is open for at least 48 hours

Below you can find a copy of the text of the PIP

Best regards
Enrico


Motivation

Currently there is no way to have a accurate backlog for a subscription:

you have only the number of "entries", not messages
server side filters (PIP-105) may filter out some messages

Having the number of entries is sometimes not enough because with
batch messages the amount of work on the Consumers is proportional to
the number of messages, that may vary from entry to entry.

Goal

The idea of this patch is to provide a dedicate API (REST,
pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and
provide detailed information about that is expected to be delivered to
Consumers.

The operation will be quite expensive because we have to load the
messages from storage and pass them to the filters, but due to the
dynamic nature of Pulsar subscriptions there is no other way to have
this value.

One good strategy to do monitoring/alerting is to setup alerts on the
usual "stats" and use this new API to inspect the subscription deeper,
typically be issuing a manual command.

API Changes

internal ManagedCursor API:

CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
maxEntries, long timeOutMs);

This method scans the Cursor from the lastMarkDelete position to the tail.
There is a time limit and a maxEntries limit, these are needed in
order to prevent huge (and useless) scans.
The Predicate can stop the scan, if it doesn't want to continue the
processing for some reasons.

New REST API:

    @GET
    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
Backlog")
    @ApiOperation(value = "Analyze a subscription, by scanning all the
unprocessed messages")

    public void analyzeBacklog SubscriptionBacklog(
           @Suspended final AsyncResponse asyncResponse,
            @ApiParam(value = "Specify the tenant", required = true)
            @PathParam("tenant") String tenant,
            @ApiParam(value = "Specify the namespace", required = true)
            @PathParam("namespace") String namespace,
            @ApiParam(value = "Specify topic name", required = true)
            @PathParam("topic") @Encoded String encodedTopic,
            @ApiParam(value = "Subscription", required = true)
            @PathParam("subName") String encodedSubName,
            @ApiParam(value = "Is authentication required to perform
this operation")
            @QueryParam("authoritative") @DefaultValue("false")
boolean authoritative) {

API response model:

public class AnalyzeSubscriptionBacklogResult {
    private long entries;
    private long messages;

    private long filterRejectedEntries;
    private long filterAcceptedEntries;
    private long filterRescheduledEntries;

    private long filterRejectedMessages;
    private long filterAcceptedMessages;
    private long filterRescheduledMessages;

    private boolean aborted;

The response contains "aborted=true" is the request has been aborted
by some internal limitations, like a timeout or the scan hit the max
number of entries.
We are not going to provide more details about the reason of the stop.
It will make the API too detailed and harder to maintain. Also, in the
logs of the broker you will find the details.

New PulsarAdmin API:

/**
     * Analyze subscription backlog.
     * This is a potentially expensive operation, as it requires
     * to read the messages from storage.
     * This function takes into consideration batch messages
     * and also Subscription filters.
     * @param topic
     *            Topic name
     * @param subscriptionName
     *            the subscription
     * @return an accurate analysis of the backlog
     * @throws PulsarAdminException
     *            Unexpected error
     */
    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String
topic, String subscriptionName)
            throws PulsarAdminException;

    /**
     * Analyze subscription backlog.
     * This is a potentially expensive operation, as it requires
     * to read the messages from storage.
     * This function takes into consideration batch messages
     * and also Subscription filters.
     * @param topic
     *            Topic name
     * @param subscriptionName
     *            the subscription
     * @return an accurate analysis of the backlog
     * @throws PulsarAdminException
     *            Unexpected error
     */
    CompletableFuture<AnaliseSubscriptionBacklogResult>
analiseSubscriptionBacklogAsync(String topic,

                 String subscriptionName);

A pulsar-admin command will be added as well as usual.

New configuration entries in broker.conf:

@FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Maximum time to spend while scanning a subscription to
calculate the accurate backlog"
 )
 private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
 @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Maximum number of entries to process while scanning a
subscription to calculate the accurate backlog"
 )
 private long subscriptionBacklogScanMaxEntries = 10_000;

Implementation

The implementation is pretty straightforward:

add a new API in ManagedCursor to do the Scan
add the REST API
implement in PersistentSubscription a analiseBacklog method that does the scan

The the PersistentSubscription runs the scan:

it applies the filters if they are present
it considers individuallyDeletedMessages

Reject Alternatives

We could store somehow some counter about the number of logical
messages during writes. But that does not work for a few reasons:

you cannot know which subscriptions will be created in a topic
subscription can be created from the past (Earliest)
subscription filters may change over time: they are usually configured
using Subscription Properties, and those properties are dynamic
doing computations on the write path (like running filters) kills
latency and thoughtput

Use a client to clone the subscription and consume data.
This doesn't work because you have to transfer the data to the client,
and this is possibly a huge amount of work and a waste of resources.

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by Enrico Olivelli <eo...@gmail.com>.
Thank you all

I am closing this VOTE as Passed
Let's follow up on the PR for detailed feedback about the implementation

Enrico

Il giorno lun 25 lug 2022 alle ore 07:21 mattison chao
<ma...@apache.org> ha scritto:
>
> +1
>
> Best,
> Mattison
>
> On Sun, 24 Jul 2022 at 14:51, Haiting Jiang <ji...@apache.org> wrote:
> >
> > +1
> >
> > Thanks,
> > Haiting
> >
> > On 2022/07/23 02:00:32 PengHui Li wrote:
> > > +1
> > >
> > > Penghui
> > >
> > > On Wed, Jul 20, 2022 at 9:41 PM Asaf Mesika <as...@gmail.com> wrote:
> > >
> > > > Sorry to barge in the vote - I forgot to send my reply on the discussion 2
> > > > days ago :)
> > > >
> > > >
> > > > On Tue, Jul 19, 2022 at 11:22 PM Nicolò Boschi <bo...@gmail.com>
> > > > wrote:
> > > >
> > > > > +1, thanks
> > > > >
> > > > > Nicolò Boschi
> > > > >
> > > > > Il Mar 19 Lug 2022, 22:16 Christophe Bornet <bo...@gmail.com> ha
> > > > > scritto:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <
> > > > > andrey.yegorov@datastax.com
> > > > > > >
> > > > > > a écrit :
> > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <wa...@apache.org>
> > > > wrote:
> > > > > > >
> > > > > > > > +1 (binding)
> > > > > > > >
> > > > > > > > I support this enhancement for when a user occasionally requires
> > > > > > accurate
> > > > > > > > backlog stats. Once we bring this into service we can see if
> > > > further
> > > > > > > > guardrails are required.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Dave
> > > > > > > >
> > > > > > > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <
> > > > eolivelli@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > This is the VOTE thread for PIP-187
> > > > > > > > >
> > > > > > > > > This is the GH issue:
> > > > > https://github.com/apache/pulsar/issues/16597
> > > > > > > > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > > > > > > > >
> > > > > > > > > The vote is open for at least 48 hours
> > > > > > > > >
> > > > > > > > > Below you can find a copy of the text of the PIP
> > > > > > > > >
> > > > > > > > > Best regards
> > > > > > > > > Enrico
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Motivation
> > > > > > > > >
> > > > > > > > > Currently there is no way to have a accurate backlog for a
> > > > > > > subscription:
> > > > > > > > >
> > > > > > > > > you have only the number of "entries", not messages
> > > > > > > > > server side filters (PIP-105) may filter out some messages
> > > > > > > > >
> > > > > > > > > Having the number of entries is sometimes not enough because with
> > > > > > > > > batch messages the amount of work on the Consumers is
> > > > proportional
> > > > > to
> > > > > > > > > the number of messages, that may vary from entry to entry.
> > > > > > > > >
> > > > > > > > > Goal
> > > > > > > > >
> > > > > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription
> > > > and
> > > > > > > > > provide detailed information about that is expected to be
> > > > delivered
> > > > > > to
> > > > > > > > > Consumers.
> > > > > > > > >
> > > > > > > > > The operation will be quite expensive because we have to load the
> > > > > > > > > messages from storage and pass them to the filters, but due to
> > > > the
> > > > > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> > > > > have
> > > > > > > > > this value.
> > > > > > > > >
> > > > > > > > > One good strategy to do monitoring/alerting is to setup alerts on
> > > > > the
> > > > > > > > > usual "stats" and use this new API to inspect the subscription
> > > > > > deeper,
> > > > > > > > > typically be issuing a manual command.
> > > > > > > > >
> > > > > > > > > API Changes
> > > > > > > > >
> > > > > > > > > internal ManagedCursor API:
> > > > > > > > >
> > > > > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> > > > > long
> > > > > > > > > maxEntries, long timeOutMs);
> > > > > > > > >
> > > > > > > > > This method scans the Cursor from the lastMarkDelete position to
> > > > > the
> > > > > > > > tail.
> > > > > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > > > > order to prevent huge (and useless) scans.
> > > > > > > > > The Predicate can stop the scan, if it doesn't want to continue
> > > > the
> > > > > > > > > processing for some reasons.
> > > > > > > > >
> > > > > > > > > New REST API:
> > > > > > > > >
> > > > > > > > >    @GET
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > > > > > > > Backlog")
> > > > > > > > >    @ApiOperation(value = "Analyze a subscription, by scanning all
> > > > > the
> > > > > > > > > unprocessed messages")
> > > > > > > > >
> > > > > > > > >    public void analyzeBacklog SubscriptionBacklog(
> > > > > > > > >           @Suspended final AsyncResponse asyncResponse,
> > > > > > > > >            @ApiParam(value = "Specify the tenant", required =
> > > > true)
> > > > > > > > >            @PathParam("tenant") String tenant,
> > > > > > > > >            @ApiParam(value = "Specify the namespace", required =
> > > > > > true)
> > > > > > > > >            @PathParam("namespace") String namespace,
> > > > > > > > >            @ApiParam(value = "Specify topic name", required =
> > > > true)
> > > > > > > > >            @PathParam("topic") @Encoded String encodedTopic,
> > > > > > > > >            @ApiParam(value = "Subscription", required = true)
> > > > > > > > >            @PathParam("subName") String encodedSubName,
> > > > > > > > >            @ApiParam(value = "Is authentication required to
> > > > perform
> > > > > > > > > this operation")
> > > > > > > > >            @QueryParam("authoritative") @DefaultValue("false")
> > > > > > > > > boolean authoritative) {
> > > > > > > > >
> > > > > > > > > API response model:
> > > > > > > > >
> > > > > > > > > public class AnalyzeSubscriptionBacklogResult {
> > > > > > > > >    private long entries;
> > > > > > > > >    private long messages;
> > > > > > > > >
> > > > > > > > >    private long filterRejectedEntries;
> > > > > > > > >    private long filterAcceptedEntries;
> > > > > > > > >    private long filterRescheduledEntries;
> > > > > > > > >
> > > > > > > > >    private long filterRejectedMessages;
> > > > > > > > >    private long filterAcceptedMessages;
> > > > > > > > >    private long filterRescheduledMessages;
> > > > > > > > >
> > > > > > > > >    private boolean aborted;
> > > > > > > > >
> > > > > > > > > The response contains "aborted=true" is the request has been
> > > > > aborted
> > > > > > > > > by some internal limitations, like a timeout or the scan hit the
> > > > > max
> > > > > > > > > number of entries.
> > > > > > > > > We are not going to provide more details about the reason of the
> > > > > > stop.
> > > > > > > > > It will make the API too detailed and harder to maintain. Also,
> > > > in
> > > > > > the
> > > > > > > > > logs of the broker you will find the details.
> > > > > > > > >
> > > > > > > > > New PulsarAdmin API:
> > > > > > > > >
> > > > > > > > > /**
> > > > > > > > >     * Analyze subscription backlog.
> > > > > > > > >     * This is a potentially expensive operation, as it requires
> > > > > > > > >     * to read the messages from storage.
> > > > > > > > >     * This function takes into consideration batch messages
> > > > > > > > >     * and also Subscription filters.
> > > > > > > > >     * @param topic
> > > > > > > > >     *            Topic name
> > > > > > > > >     * @param subscriptionName
> > > > > > > > >     *            the subscription
> > > > > > > > >     * @return an accurate analysis of the backlog
> > > > > > > > >     * @throws PulsarAdminException
> > > > > > > > >     *            Unexpected error
> > > > > > > > >     */
> > > > > > > > >    AnalyzeSubscriptionBacklogResult
> > > > > analyzeSubscriptionBacklog(String
> > > > > > > > > topic, String subscriptionName)
> > > > > > > > >            throws PulsarAdminException;
> > > > > > > > >
> > > > > > > > >    /**
> > > > > > > > >     * Analyze subscription backlog.
> > > > > > > > >     * This is a potentially expensive operation, as it requires
> > > > > > > > >     * to read the messages from storage.
> > > > > > > > >     * This function takes into consideration batch messages
> > > > > > > > >     * and also Subscription filters.
> > > > > > > > >     * @param topic
> > > > > > > > >     *            Topic name
> > > > > > > > >     * @param subscriptionName
> > > > > > > > >     *            the subscription
> > > > > > > > >     * @return an accurate analysis of the backlog
> > > > > > > > >     * @throws PulsarAdminException
> > > > > > > > >     *            Unexpected error
> > > > > > > > >     */
> > > > > > > > >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > > > > >
> > > > > > > > >                 String subscriptionName);
> > > > > > > > >
> > > > > > > > > A pulsar-admin command will be added as well as usual.
> > > > > > > > >
> > > > > > > > > New configuration entries in broker.conf:
> > > > > > > > >
> > > > > > > > > @FieldContext(
> > > > > > > > >         category = CATEGORY_POLICIES,
> > > > > > > > >         doc = "Maximum time to spend while scanning a
> > > > subscription
> > > > > to
> > > > > > > > > calculate the accurate backlog"
> > > > > > > > > )
> > > > > > > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > > > > > @FieldContext(
> > > > > > > > >         category = CATEGORY_POLICIES,
> > > > > > > > >         doc = "Maximum number of entries to process while
> > > > scanning
> > > > > a
> > > > > > > > > subscription to calculate the accurate backlog"
> > > > > > > > > )
> > > > > > > > > private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > > > > >
> > > > > > > > > Implementation
> > > > > > > > >
> > > > > > > > > The implementation is pretty straightforward:
> > > > > > > > >
> > > > > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > > > > add the REST API
> > > > > > > > > implement in PersistentSubscription a analiseBacklog method that
> > > > > does
> > > > > > > > the scan
> > > > > > > > >
> > > > > > > > > The the PersistentSubscription runs the scan:
> > > > > > > > >
> > > > > > > > > it applies the filters if they are present
> > > > > > > > > it considers individuallyDeletedMessages
> > > > > > > > >
> > > > > > > > > Reject Alternatives
> > > > > > > > >
> > > > > > > > > We could store somehow some counter about the number of logical
> > > > > > > > > messages during writes. But that does not work for a few reasons:
> > > > > > > > >
> > > > > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > > > > subscription can be created from the past (Earliest)
> > > > > > > > > subscription filters may change over time: they are usually
> > > > > > configured
> > > > > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > > > > doing computations on the write path (like running filters) kills
> > > > > > > > > latency and thoughtput
> > > > > > > > >
> > > > > > > > > Use a client to clone the subscription and consume data.
> > > > > > > > > This doesn't work because you have to transfer the data to the
> > > > > > client,
> > > > > > > > > and this is possibly a huge amount of work and a waste of
> > > > > resources.
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Andrey Yegorov
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by mattison chao <ma...@apache.org>.
+1

Best,
Mattison

On Sun, 24 Jul 2022 at 14:51, Haiting Jiang <ji...@apache.org> wrote:
>
> +1
>
> Thanks,
> Haiting
>
> On 2022/07/23 02:00:32 PengHui Li wrote:
> > +1
> >
> > Penghui
> >
> > On Wed, Jul 20, 2022 at 9:41 PM Asaf Mesika <as...@gmail.com> wrote:
> >
> > > Sorry to barge in the vote - I forgot to send my reply on the discussion 2
> > > days ago :)
> > >
> > >
> > > On Tue, Jul 19, 2022 at 11:22 PM Nicolò Boschi <bo...@gmail.com>
> > > wrote:
> > >
> > > > +1, thanks
> > > >
> > > > Nicolò Boschi
> > > >
> > > > Il Mar 19 Lug 2022, 22:16 Christophe Bornet <bo...@gmail.com> ha
> > > > scritto:
> > > >
> > > > > +1
> > > > >
> > > > > Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <
> > > > andrey.yegorov@datastax.com
> > > > > >
> > > > > a écrit :
> > > > >
> > > > > > +1
> > > > > >
> > > > > > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <wa...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > +1 (binding)
> > > > > > >
> > > > > > > I support this enhancement for when a user occasionally requires
> > > > > accurate
> > > > > > > backlog stats. Once we bring this into service we can see if
> > > further
> > > > > > > guardrails are required.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Dave
> > > > > > >
> > > > > > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <
> > > eolivelli@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > This is the VOTE thread for PIP-187
> > > > > > > >
> > > > > > > > This is the GH issue:
> > > > https://github.com/apache/pulsar/issues/16597
> > > > > > > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > > > > > > >
> > > > > > > > The vote is open for at least 48 hours
> > > > > > > >
> > > > > > > > Below you can find a copy of the text of the PIP
> > > > > > > >
> > > > > > > > Best regards
> > > > > > > > Enrico
> > > > > > > >
> > > > > > > >
> > > > > > > > Motivation
> > > > > > > >
> > > > > > > > Currently there is no way to have a accurate backlog for a
> > > > > > subscription:
> > > > > > > >
> > > > > > > > you have only the number of "entries", not messages
> > > > > > > > server side filters (PIP-105) may filter out some messages
> > > > > > > >
> > > > > > > > Having the number of entries is sometimes not enough because with
> > > > > > > > batch messages the amount of work on the Consumers is
> > > proportional
> > > > to
> > > > > > > > the number of messages, that may vary from entry to entry.
> > > > > > > >
> > > > > > > > Goal
> > > > > > > >
> > > > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription
> > > and
> > > > > > > > provide detailed information about that is expected to be
> > > delivered
> > > > > to
> > > > > > > > Consumers.
> > > > > > > >
> > > > > > > > The operation will be quite expensive because we have to load the
> > > > > > > > messages from storage and pass them to the filters, but due to
> > > the
> > > > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> > > > have
> > > > > > > > this value.
> > > > > > > >
> > > > > > > > One good strategy to do monitoring/alerting is to setup alerts on
> > > > the
> > > > > > > > usual "stats" and use this new API to inspect the subscription
> > > > > deeper,
> > > > > > > > typically be issuing a manual command.
> > > > > > > >
> > > > > > > > API Changes
> > > > > > > >
> > > > > > > > internal ManagedCursor API:
> > > > > > > >
> > > > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> > > > long
> > > > > > > > maxEntries, long timeOutMs);
> > > > > > > >
> > > > > > > > This method scans the Cursor from the lastMarkDelete position to
> > > > the
> > > > > > > tail.
> > > > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > > > order to prevent huge (and useless) scans.
> > > > > > > > The Predicate can stop the scan, if it doesn't want to continue
> > > the
> > > > > > > > processing for some reasons.
> > > > > > > >
> > > > > > > > New REST API:
> > > > > > > >
> > > > > > > >    @GET
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > > > > > > Backlog")
> > > > > > > >    @ApiOperation(value = "Analyze a subscription, by scanning all
> > > > the
> > > > > > > > unprocessed messages")
> > > > > > > >
> > > > > > > >    public void analyzeBacklog SubscriptionBacklog(
> > > > > > > >           @Suspended final AsyncResponse asyncResponse,
> > > > > > > >            @ApiParam(value = "Specify the tenant", required =
> > > true)
> > > > > > > >            @PathParam("tenant") String tenant,
> > > > > > > >            @ApiParam(value = "Specify the namespace", required =
> > > > > true)
> > > > > > > >            @PathParam("namespace") String namespace,
> > > > > > > >            @ApiParam(value = "Specify topic name", required =
> > > true)
> > > > > > > >            @PathParam("topic") @Encoded String encodedTopic,
> > > > > > > >            @ApiParam(value = "Subscription", required = true)
> > > > > > > >            @PathParam("subName") String encodedSubName,
> > > > > > > >            @ApiParam(value = "Is authentication required to
> > > perform
> > > > > > > > this operation")
> > > > > > > >            @QueryParam("authoritative") @DefaultValue("false")
> > > > > > > > boolean authoritative) {
> > > > > > > >
> > > > > > > > API response model:
> > > > > > > >
> > > > > > > > public class AnalyzeSubscriptionBacklogResult {
> > > > > > > >    private long entries;
> > > > > > > >    private long messages;
> > > > > > > >
> > > > > > > >    private long filterRejectedEntries;
> > > > > > > >    private long filterAcceptedEntries;
> > > > > > > >    private long filterRescheduledEntries;
> > > > > > > >
> > > > > > > >    private long filterRejectedMessages;
> > > > > > > >    private long filterAcceptedMessages;
> > > > > > > >    private long filterRescheduledMessages;
> > > > > > > >
> > > > > > > >    private boolean aborted;
> > > > > > > >
> > > > > > > > The response contains "aborted=true" is the request has been
> > > > aborted
> > > > > > > > by some internal limitations, like a timeout or the scan hit the
> > > > max
> > > > > > > > number of entries.
> > > > > > > > We are not going to provide more details about the reason of the
> > > > > stop.
> > > > > > > > It will make the API too detailed and harder to maintain. Also,
> > > in
> > > > > the
> > > > > > > > logs of the broker you will find the details.
> > > > > > > >
> > > > > > > > New PulsarAdmin API:
> > > > > > > >
> > > > > > > > /**
> > > > > > > >     * Analyze subscription backlog.
> > > > > > > >     * This is a potentially expensive operation, as it requires
> > > > > > > >     * to read the messages from storage.
> > > > > > > >     * This function takes into consideration batch messages
> > > > > > > >     * and also Subscription filters.
> > > > > > > >     * @param topic
> > > > > > > >     *            Topic name
> > > > > > > >     * @param subscriptionName
> > > > > > > >     *            the subscription
> > > > > > > >     * @return an accurate analysis of the backlog
> > > > > > > >     * @throws PulsarAdminException
> > > > > > > >     *            Unexpected error
> > > > > > > >     */
> > > > > > > >    AnalyzeSubscriptionBacklogResult
> > > > analyzeSubscriptionBacklog(String
> > > > > > > > topic, String subscriptionName)
> > > > > > > >            throws PulsarAdminException;
> > > > > > > >
> > > > > > > >    /**
> > > > > > > >     * Analyze subscription backlog.
> > > > > > > >     * This is a potentially expensive operation, as it requires
> > > > > > > >     * to read the messages from storage.
> > > > > > > >     * This function takes into consideration batch messages
> > > > > > > >     * and also Subscription filters.
> > > > > > > >     * @param topic
> > > > > > > >     *            Topic name
> > > > > > > >     * @param subscriptionName
> > > > > > > >     *            the subscription
> > > > > > > >     * @return an accurate analysis of the backlog
> > > > > > > >     * @throws PulsarAdminException
> > > > > > > >     *            Unexpected error
> > > > > > > >     */
> > > > > > > >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > > > >
> > > > > > > >                 String subscriptionName);
> > > > > > > >
> > > > > > > > A pulsar-admin command will be added as well as usual.
> > > > > > > >
> > > > > > > > New configuration entries in broker.conf:
> > > > > > > >
> > > > > > > > @FieldContext(
> > > > > > > >         category = CATEGORY_POLICIES,
> > > > > > > >         doc = "Maximum time to spend while scanning a
> > > subscription
> > > > to
> > > > > > > > calculate the accurate backlog"
> > > > > > > > )
> > > > > > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > > > > @FieldContext(
> > > > > > > >         category = CATEGORY_POLICIES,
> > > > > > > >         doc = "Maximum number of entries to process while
> > > scanning
> > > > a
> > > > > > > > subscription to calculate the accurate backlog"
> > > > > > > > )
> > > > > > > > private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > > > >
> > > > > > > > Implementation
> > > > > > > >
> > > > > > > > The implementation is pretty straightforward:
> > > > > > > >
> > > > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > > > add the REST API
> > > > > > > > implement in PersistentSubscription a analiseBacklog method that
> > > > does
> > > > > > > the scan
> > > > > > > >
> > > > > > > > The the PersistentSubscription runs the scan:
> > > > > > > >
> > > > > > > > it applies the filters if they are present
> > > > > > > > it considers individuallyDeletedMessages
> > > > > > > >
> > > > > > > > Reject Alternatives
> > > > > > > >
> > > > > > > > We could store somehow some counter about the number of logical
> > > > > > > > messages during writes. But that does not work for a few reasons:
> > > > > > > >
> > > > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > > > subscription can be created from the past (Earliest)
> > > > > > > > subscription filters may change over time: they are usually
> > > > > configured
> > > > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > > > doing computations on the write path (like running filters) kills
> > > > > > > > latency and thoughtput
> > > > > > > >
> > > > > > > > Use a client to clone the subscription and consume data.
> > > > > > > > This doesn't work because you have to transfer the data to the
> > > > > client,
> > > > > > > > and this is possibly a huge amount of work and a waste of
> > > > resources.
> > > > > > >
> > > > > > >
> > > > > >
> > > > > > --
> > > > > > Andrey Yegorov
> > > > > >
> > > > >
> > > >
> > >
> >

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by Haiting Jiang <ji...@apache.org>.
+1

Thanks,
Haiting

On 2022/07/23 02:00:32 PengHui Li wrote:
> +1
> 
> Penghui
> 
> On Wed, Jul 20, 2022 at 9:41 PM Asaf Mesika <as...@gmail.com> wrote:
> 
> > Sorry to barge in the vote - I forgot to send my reply on the discussion 2
> > days ago :)
> >
> >
> > On Tue, Jul 19, 2022 at 11:22 PM Nicolò Boschi <bo...@gmail.com>
> > wrote:
> >
> > > +1, thanks
> > >
> > > Nicolò Boschi
> > >
> > > Il Mar 19 Lug 2022, 22:16 Christophe Bornet <bo...@gmail.com> ha
> > > scritto:
> > >
> > > > +1
> > > >
> > > > Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <
> > > andrey.yegorov@datastax.com
> > > > >
> > > > a écrit :
> > > >
> > > > > +1
> > > > >
> > > > > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <wa...@apache.org>
> > wrote:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > I support this enhancement for when a user occasionally requires
> > > > accurate
> > > > > > backlog stats. Once we bring this into service we can see if
> > further
> > > > > > guardrails are required.
> > > > > >
> > > > > > Regards,
> > > > > > Dave
> > > > > >
> > > > > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <
> > eolivelli@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > This is the VOTE thread for PIP-187
> > > > > > >
> > > > > > > This is the GH issue:
> > > https://github.com/apache/pulsar/issues/16597
> > > > > > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > > > > > >
> > > > > > > The vote is open for at least 48 hours
> > > > > > >
> > > > > > > Below you can find a copy of the text of the PIP
> > > > > > >
> > > > > > > Best regards
> > > > > > > Enrico
> > > > > > >
> > > > > > >
> > > > > > > Motivation
> > > > > > >
> > > > > > > Currently there is no way to have a accurate backlog for a
> > > > > subscription:
> > > > > > >
> > > > > > > you have only the number of "entries", not messages
> > > > > > > server side filters (PIP-105) may filter out some messages
> > > > > > >
> > > > > > > Having the number of entries is sometimes not enough because with
> > > > > > > batch messages the amount of work on the Consumers is
> > proportional
> > > to
> > > > > > > the number of messages, that may vary from entry to entry.
> > > > > > >
> > > > > > > Goal
> > > > > > >
> > > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription
> > and
> > > > > > > provide detailed information about that is expected to be
> > delivered
> > > > to
> > > > > > > Consumers.
> > > > > > >
> > > > > > > The operation will be quite expensive because we have to load the
> > > > > > > messages from storage and pass them to the filters, but due to
> > the
> > > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> > > have
> > > > > > > this value.
> > > > > > >
> > > > > > > One good strategy to do monitoring/alerting is to setup alerts on
> > > the
> > > > > > > usual "stats" and use this new API to inspect the subscription
> > > > deeper,
> > > > > > > typically be issuing a manual command.
> > > > > > >
> > > > > > > API Changes
> > > > > > >
> > > > > > > internal ManagedCursor API:
> > > > > > >
> > > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> > > long
> > > > > > > maxEntries, long timeOutMs);
> > > > > > >
> > > > > > > This method scans the Cursor from the lastMarkDelete position to
> > > the
> > > > > > tail.
> > > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > > order to prevent huge (and useless) scans.
> > > > > > > The Predicate can stop the scan, if it doesn't want to continue
> > the
> > > > > > > processing for some reasons.
> > > > > > >
> > > > > > > New REST API:
> > > > > > >
> > > > > > >    @GET
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > > > > > Backlog")
> > > > > > >    @ApiOperation(value = "Analyze a subscription, by scanning all
> > > the
> > > > > > > unprocessed messages")
> > > > > > >
> > > > > > >    public void analyzeBacklog SubscriptionBacklog(
> > > > > > >           @Suspended final AsyncResponse asyncResponse,
> > > > > > >            @ApiParam(value = "Specify the tenant", required =
> > true)
> > > > > > >            @PathParam("tenant") String tenant,
> > > > > > >            @ApiParam(value = "Specify the namespace", required =
> > > > true)
> > > > > > >            @PathParam("namespace") String namespace,
> > > > > > >            @ApiParam(value = "Specify topic name", required =
> > true)
> > > > > > >            @PathParam("topic") @Encoded String encodedTopic,
> > > > > > >            @ApiParam(value = "Subscription", required = true)
> > > > > > >            @PathParam("subName") String encodedSubName,
> > > > > > >            @ApiParam(value = "Is authentication required to
> > perform
> > > > > > > this operation")
> > > > > > >            @QueryParam("authoritative") @DefaultValue("false")
> > > > > > > boolean authoritative) {
> > > > > > >
> > > > > > > API response model:
> > > > > > >
> > > > > > > public class AnalyzeSubscriptionBacklogResult {
> > > > > > >    private long entries;
> > > > > > >    private long messages;
> > > > > > >
> > > > > > >    private long filterRejectedEntries;
> > > > > > >    private long filterAcceptedEntries;
> > > > > > >    private long filterRescheduledEntries;
> > > > > > >
> > > > > > >    private long filterRejectedMessages;
> > > > > > >    private long filterAcceptedMessages;
> > > > > > >    private long filterRescheduledMessages;
> > > > > > >
> > > > > > >    private boolean aborted;
> > > > > > >
> > > > > > > The response contains "aborted=true" is the request has been
> > > aborted
> > > > > > > by some internal limitations, like a timeout or the scan hit the
> > > max
> > > > > > > number of entries.
> > > > > > > We are not going to provide more details about the reason of the
> > > > stop.
> > > > > > > It will make the API too detailed and harder to maintain. Also,
> > in
> > > > the
> > > > > > > logs of the broker you will find the details.
> > > > > > >
> > > > > > > New PulsarAdmin API:
> > > > > > >
> > > > > > > /**
> > > > > > >     * Analyze subscription backlog.
> > > > > > >     * This is a potentially expensive operation, as it requires
> > > > > > >     * to read the messages from storage.
> > > > > > >     * This function takes into consideration batch messages
> > > > > > >     * and also Subscription filters.
> > > > > > >     * @param topic
> > > > > > >     *            Topic name
> > > > > > >     * @param subscriptionName
> > > > > > >     *            the subscription
> > > > > > >     * @return an accurate analysis of the backlog
> > > > > > >     * @throws PulsarAdminException
> > > > > > >     *            Unexpected error
> > > > > > >     */
> > > > > > >    AnalyzeSubscriptionBacklogResult
> > > analyzeSubscriptionBacklog(String
> > > > > > > topic, String subscriptionName)
> > > > > > >            throws PulsarAdminException;
> > > > > > >
> > > > > > >    /**
> > > > > > >     * Analyze subscription backlog.
> > > > > > >     * This is a potentially expensive operation, as it requires
> > > > > > >     * to read the messages from storage.
> > > > > > >     * This function takes into consideration batch messages
> > > > > > >     * and also Subscription filters.
> > > > > > >     * @param topic
> > > > > > >     *            Topic name
> > > > > > >     * @param subscriptionName
> > > > > > >     *            the subscription
> > > > > > >     * @return an accurate analysis of the backlog
> > > > > > >     * @throws PulsarAdminException
> > > > > > >     *            Unexpected error
> > > > > > >     */
> > > > > > >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > > >
> > > > > > >                 String subscriptionName);
> > > > > > >
> > > > > > > A pulsar-admin command will be added as well as usual.
> > > > > > >
> > > > > > > New configuration entries in broker.conf:
> > > > > > >
> > > > > > > @FieldContext(
> > > > > > >         category = CATEGORY_POLICIES,
> > > > > > >         doc = "Maximum time to spend while scanning a
> > subscription
> > > to
> > > > > > > calculate the accurate backlog"
> > > > > > > )
> > > > > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > > > @FieldContext(
> > > > > > >         category = CATEGORY_POLICIES,
> > > > > > >         doc = "Maximum number of entries to process while
> > scanning
> > > a
> > > > > > > subscription to calculate the accurate backlog"
> > > > > > > )
> > > > > > > private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > > >
> > > > > > > Implementation
> > > > > > >
> > > > > > > The implementation is pretty straightforward:
> > > > > > >
> > > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > > add the REST API
> > > > > > > implement in PersistentSubscription a analiseBacklog method that
> > > does
> > > > > > the scan
> > > > > > >
> > > > > > > The the PersistentSubscription runs the scan:
> > > > > > >
> > > > > > > it applies the filters if they are present
> > > > > > > it considers individuallyDeletedMessages
> > > > > > >
> > > > > > > Reject Alternatives
> > > > > > >
> > > > > > > We could store somehow some counter about the number of logical
> > > > > > > messages during writes. But that does not work for a few reasons:
> > > > > > >
> > > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > > subscription can be created from the past (Earliest)
> > > > > > > subscription filters may change over time: they are usually
> > > > configured
> > > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > > doing computations on the write path (like running filters) kills
> > > > > > > latency and thoughtput
> > > > > > >
> > > > > > > Use a client to clone the subscription and consume data.
> > > > > > > This doesn't work because you have to transfer the data to the
> > > > client,
> > > > > > > and this is possibly a huge amount of work and a waste of
> > > resources.
> > > > > >
> > > > > >
> > > > >
> > > > > --
> > > > > Andrey Yegorov
> > > > >
> > > >
> > >
> >
> 

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by PengHui Li <pe...@apache.org>.
+1

Penghui

On Wed, Jul 20, 2022 at 9:41 PM Asaf Mesika <as...@gmail.com> wrote:

> Sorry to barge in the vote - I forgot to send my reply on the discussion 2
> days ago :)
>
>
> On Tue, Jul 19, 2022 at 11:22 PM Nicolò Boschi <bo...@gmail.com>
> wrote:
>
> > +1, thanks
> >
> > Nicolò Boschi
> >
> > Il Mar 19 Lug 2022, 22:16 Christophe Bornet <bo...@gmail.com> ha
> > scritto:
> >
> > > +1
> > >
> > > Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <
> > andrey.yegorov@datastax.com
> > > >
> > > a écrit :
> > >
> > > > +1
> > > >
> > > > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <wa...@apache.org>
> wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > I support this enhancement for when a user occasionally requires
> > > accurate
> > > > > backlog stats. Once we bring this into service we can see if
> further
> > > > > guardrails are required.
> > > > >
> > > > > Regards,
> > > > > Dave
> > > > >
> > > > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <
> eolivelli@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > This is the VOTE thread for PIP-187
> > > > > >
> > > > > > This is the GH issue:
> > https://github.com/apache/pulsar/issues/16597
> > > > > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > > > > >
> > > > > > The vote is open for at least 48 hours
> > > > > >
> > > > > > Below you can find a copy of the text of the PIP
> > > > > >
> > > > > > Best regards
> > > > > > Enrico
> > > > > >
> > > > > >
> > > > > > Motivation
> > > > > >
> > > > > > Currently there is no way to have a accurate backlog for a
> > > > subscription:
> > > > > >
> > > > > > you have only the number of "entries", not messages
> > > > > > server side filters (PIP-105) may filter out some messages
> > > > > >
> > > > > > Having the number of entries is sometimes not enough because with
> > > > > > batch messages the amount of work on the Consumers is
> proportional
> > to
> > > > > > the number of messages, that may vary from entry to entry.
> > > > > >
> > > > > > Goal
> > > > > >
> > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription
> and
> > > > > > provide detailed information about that is expected to be
> delivered
> > > to
> > > > > > Consumers.
> > > > > >
> > > > > > The operation will be quite expensive because we have to load the
> > > > > > messages from storage and pass them to the filters, but due to
> the
> > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> > have
> > > > > > this value.
> > > > > >
> > > > > > One good strategy to do monitoring/alerting is to setup alerts on
> > the
> > > > > > usual "stats" and use this new API to inspect the subscription
> > > deeper,
> > > > > > typically be issuing a manual command.
> > > > > >
> > > > > > API Changes
> > > > > >
> > > > > > internal ManagedCursor API:
> > > > > >
> > > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> > long
> > > > > > maxEntries, long timeOutMs);
> > > > > >
> > > > > > This method scans the Cursor from the lastMarkDelete position to
> > the
> > > > > tail.
> > > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > > order to prevent huge (and useless) scans.
> > > > > > The Predicate can stop the scan, if it doesn't want to continue
> the
> > > > > > processing for some reasons.
> > > > > >
> > > > > > New REST API:
> > > > > >
> > > > > >    @GET
> > > > > >
> > > > >
> > > >
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > > > > Backlog")
> > > > > >    @ApiOperation(value = "Analyze a subscription, by scanning all
> > the
> > > > > > unprocessed messages")
> > > > > >
> > > > > >    public void analyzeBacklog SubscriptionBacklog(
> > > > > >           @Suspended final AsyncResponse asyncResponse,
> > > > > >            @ApiParam(value = "Specify the tenant", required =
> true)
> > > > > >            @PathParam("tenant") String tenant,
> > > > > >            @ApiParam(value = "Specify the namespace", required =
> > > true)
> > > > > >            @PathParam("namespace") String namespace,
> > > > > >            @ApiParam(value = "Specify topic name", required =
> true)
> > > > > >            @PathParam("topic") @Encoded String encodedTopic,
> > > > > >            @ApiParam(value = "Subscription", required = true)
> > > > > >            @PathParam("subName") String encodedSubName,
> > > > > >            @ApiParam(value = "Is authentication required to
> perform
> > > > > > this operation")
> > > > > >            @QueryParam("authoritative") @DefaultValue("false")
> > > > > > boolean authoritative) {
> > > > > >
> > > > > > API response model:
> > > > > >
> > > > > > public class AnalyzeSubscriptionBacklogResult {
> > > > > >    private long entries;
> > > > > >    private long messages;
> > > > > >
> > > > > >    private long filterRejectedEntries;
> > > > > >    private long filterAcceptedEntries;
> > > > > >    private long filterRescheduledEntries;
> > > > > >
> > > > > >    private long filterRejectedMessages;
> > > > > >    private long filterAcceptedMessages;
> > > > > >    private long filterRescheduledMessages;
> > > > > >
> > > > > >    private boolean aborted;
> > > > > >
> > > > > > The response contains "aborted=true" is the request has been
> > aborted
> > > > > > by some internal limitations, like a timeout or the scan hit the
> > max
> > > > > > number of entries.
> > > > > > We are not going to provide more details about the reason of the
> > > stop.
> > > > > > It will make the API too detailed and harder to maintain. Also,
> in
> > > the
> > > > > > logs of the broker you will find the details.
> > > > > >
> > > > > > New PulsarAdmin API:
> > > > > >
> > > > > > /**
> > > > > >     * Analyze subscription backlog.
> > > > > >     * This is a potentially expensive operation, as it requires
> > > > > >     * to read the messages from storage.
> > > > > >     * This function takes into consideration batch messages
> > > > > >     * and also Subscription filters.
> > > > > >     * @param topic
> > > > > >     *            Topic name
> > > > > >     * @param subscriptionName
> > > > > >     *            the subscription
> > > > > >     * @return an accurate analysis of the backlog
> > > > > >     * @throws PulsarAdminException
> > > > > >     *            Unexpected error
> > > > > >     */
> > > > > >    AnalyzeSubscriptionBacklogResult
> > analyzeSubscriptionBacklog(String
> > > > > > topic, String subscriptionName)
> > > > > >            throws PulsarAdminException;
> > > > > >
> > > > > >    /**
> > > > > >     * Analyze subscription backlog.
> > > > > >     * This is a potentially expensive operation, as it requires
> > > > > >     * to read the messages from storage.
> > > > > >     * This function takes into consideration batch messages
> > > > > >     * and also Subscription filters.
> > > > > >     * @param topic
> > > > > >     *            Topic name
> > > > > >     * @param subscriptionName
> > > > > >     *            the subscription
> > > > > >     * @return an accurate analysis of the backlog
> > > > > >     * @throws PulsarAdminException
> > > > > >     *            Unexpected error
> > > > > >     */
> > > > > >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > > >
> > > > > >                 String subscriptionName);
> > > > > >
> > > > > > A pulsar-admin command will be added as well as usual.
> > > > > >
> > > > > > New configuration entries in broker.conf:
> > > > > >
> > > > > > @FieldContext(
> > > > > >         category = CATEGORY_POLICIES,
> > > > > >         doc = "Maximum time to spend while scanning a
> subscription
> > to
> > > > > > calculate the accurate backlog"
> > > > > > )
> > > > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > > @FieldContext(
> > > > > >         category = CATEGORY_POLICIES,
> > > > > >         doc = "Maximum number of entries to process while
> scanning
> > a
> > > > > > subscription to calculate the accurate backlog"
> > > > > > )
> > > > > > private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > > >
> > > > > > Implementation
> > > > > >
> > > > > > The implementation is pretty straightforward:
> > > > > >
> > > > > > add a new API in ManagedCursor to do the Scan
> > > > > > add the REST API
> > > > > > implement in PersistentSubscription a analiseBacklog method that
> > does
> > > > > the scan
> > > > > >
> > > > > > The the PersistentSubscription runs the scan:
> > > > > >
> > > > > > it applies the filters if they are present
> > > > > > it considers individuallyDeletedMessages
> > > > > >
> > > > > > Reject Alternatives
> > > > > >
> > > > > > We could store somehow some counter about the number of logical
> > > > > > messages during writes. But that does not work for a few reasons:
> > > > > >
> > > > > > you cannot know which subscriptions will be created in a topic
> > > > > > subscription can be created from the past (Earliest)
> > > > > > subscription filters may change over time: they are usually
> > > configured
> > > > > > using Subscription Properties, and those properties are dynamic
> > > > > > doing computations on the write path (like running filters) kills
> > > > > > latency and thoughtput
> > > > > >
> > > > > > Use a client to clone the subscription and consume data.
> > > > > > This doesn't work because you have to transfer the data to the
> > > client,
> > > > > > and this is possibly a huge amount of work and a waste of
> > resources.
> > > > >
> > > > >
> > > >
> > > > --
> > > > Andrey Yegorov
> > > >
> > >
> >
>

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by Asaf Mesika <as...@gmail.com>.
Sorry to barge in the vote - I forgot to send my reply on the discussion 2
days ago :)


On Tue, Jul 19, 2022 at 11:22 PM Nicolò Boschi <bo...@gmail.com> wrote:

> +1, thanks
>
> Nicolò Boschi
>
> Il Mar 19 Lug 2022, 22:16 Christophe Bornet <bo...@gmail.com> ha
> scritto:
>
> > +1
> >
> > Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <
> andrey.yegorov@datastax.com
> > >
> > a écrit :
> >
> > > +1
> > >
> > > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <wa...@apache.org> wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > I support this enhancement for when a user occasionally requires
> > accurate
> > > > backlog stats. Once we bring this into service we can see if further
> > > > guardrails are required.
> > > >
> > > > Regards,
> > > > Dave
> > > >
> > > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <eolivelli@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > This is the VOTE thread for PIP-187
> > > > >
> > > > > This is the GH issue:
> https://github.com/apache/pulsar/issues/16597
> > > > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > > > >
> > > > > The vote is open for at least 48 hours
> > > > >
> > > > > Below you can find a copy of the text of the PIP
> > > > >
> > > > > Best regards
> > > > > Enrico
> > > > >
> > > > >
> > > > > Motivation
> > > > >
> > > > > Currently there is no way to have a accurate backlog for a
> > > subscription:
> > > > >
> > > > > you have only the number of "entries", not messages
> > > > > server side filters (PIP-105) may filter out some messages
> > > > >
> > > > > Having the number of entries is sometimes not enough because with
> > > > > batch messages the amount of work on the Consumers is proportional
> to
> > > > > the number of messages, that may vary from entry to entry.
> > > > >
> > > > > Goal
> > > > >
> > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and
> > > > > provide detailed information about that is expected to be delivered
> > to
> > > > > Consumers.
> > > > >
> > > > > The operation will be quite expensive because we have to load the
> > > > > messages from storage and pass them to the filters, but due to the
> > > > > dynamic nature of Pulsar subscriptions there is no other way to
> have
> > > > > this value.
> > > > >
> > > > > One good strategy to do monitoring/alerting is to setup alerts on
> the
> > > > > usual "stats" and use this new API to inspect the subscription
> > deeper,
> > > > > typically be issuing a manual command.
> > > > >
> > > > > API Changes
> > > > >
> > > > > internal ManagedCursor API:
> > > > >
> > > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition,
> long
> > > > > maxEntries, long timeOutMs);
> > > > >
> > > > > This method scans the Cursor from the lastMarkDelete position to
> the
> > > > tail.
> > > > > There is a time limit and a maxEntries limit, these are needed in
> > > > > order to prevent huge (and useless) scans.
> > > > > The Predicate can stop the scan, if it doesn't want to continue the
> > > > > processing for some reasons.
> > > > >
> > > > > New REST API:
> > > > >
> > > > >    @GET
> > > > >
> > > >
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > > > Backlog")
> > > > >    @ApiOperation(value = "Analyze a subscription, by scanning all
> the
> > > > > unprocessed messages")
> > > > >
> > > > >    public void analyzeBacklog SubscriptionBacklog(
> > > > >           @Suspended final AsyncResponse asyncResponse,
> > > > >            @ApiParam(value = "Specify the tenant", required = true)
> > > > >            @PathParam("tenant") String tenant,
> > > > >            @ApiParam(value = "Specify the namespace", required =
> > true)
> > > > >            @PathParam("namespace") String namespace,
> > > > >            @ApiParam(value = "Specify topic name", required = true)
> > > > >            @PathParam("topic") @Encoded String encodedTopic,
> > > > >            @ApiParam(value = "Subscription", required = true)
> > > > >            @PathParam("subName") String encodedSubName,
> > > > >            @ApiParam(value = "Is authentication required to perform
> > > > > this operation")
> > > > >            @QueryParam("authoritative") @DefaultValue("false")
> > > > > boolean authoritative) {
> > > > >
> > > > > API response model:
> > > > >
> > > > > public class AnalyzeSubscriptionBacklogResult {
> > > > >    private long entries;
> > > > >    private long messages;
> > > > >
> > > > >    private long filterRejectedEntries;
> > > > >    private long filterAcceptedEntries;
> > > > >    private long filterRescheduledEntries;
> > > > >
> > > > >    private long filterRejectedMessages;
> > > > >    private long filterAcceptedMessages;
> > > > >    private long filterRescheduledMessages;
> > > > >
> > > > >    private boolean aborted;
> > > > >
> > > > > The response contains "aborted=true" is the request has been
> aborted
> > > > > by some internal limitations, like a timeout or the scan hit the
> max
> > > > > number of entries.
> > > > > We are not going to provide more details about the reason of the
> > stop.
> > > > > It will make the API too detailed and harder to maintain. Also, in
> > the
> > > > > logs of the broker you will find the details.
> > > > >
> > > > > New PulsarAdmin API:
> > > > >
> > > > > /**
> > > > >     * Analyze subscription backlog.
> > > > >     * This is a potentially expensive operation, as it requires
> > > > >     * to read the messages from storage.
> > > > >     * This function takes into consideration batch messages
> > > > >     * and also Subscription filters.
> > > > >     * @param topic
> > > > >     *            Topic name
> > > > >     * @param subscriptionName
> > > > >     *            the subscription
> > > > >     * @return an accurate analysis of the backlog
> > > > >     * @throws PulsarAdminException
> > > > >     *            Unexpected error
> > > > >     */
> > > > >    AnalyzeSubscriptionBacklogResult
> analyzeSubscriptionBacklog(String
> > > > > topic, String subscriptionName)
> > > > >            throws PulsarAdminException;
> > > > >
> > > > >    /**
> > > > >     * Analyze subscription backlog.
> > > > >     * This is a potentially expensive operation, as it requires
> > > > >     * to read the messages from storage.
> > > > >     * This function takes into consideration batch messages
> > > > >     * and also Subscription filters.
> > > > >     * @param topic
> > > > >     *            Topic name
> > > > >     * @param subscriptionName
> > > > >     *            the subscription
> > > > >     * @return an accurate analysis of the backlog
> > > > >     * @throws PulsarAdminException
> > > > >     *            Unexpected error
> > > > >     */
> > > > >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > > analiseSubscriptionBacklogAsync(String topic,
> > > > >
> > > > >                 String subscriptionName);
> > > > >
> > > > > A pulsar-admin command will be added as well as usual.
> > > > >
> > > > > New configuration entries in broker.conf:
> > > > >
> > > > > @FieldContext(
> > > > >         category = CATEGORY_POLICIES,
> > > > >         doc = "Maximum time to spend while scanning a subscription
> to
> > > > > calculate the accurate backlog"
> > > > > )
> > > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > > @FieldContext(
> > > > >         category = CATEGORY_POLICIES,
> > > > >         doc = "Maximum number of entries to process while scanning
> a
> > > > > subscription to calculate the accurate backlog"
> > > > > )
> > > > > private long subscriptionBacklogScanMaxEntries = 10_000;
> > > > >
> > > > > Implementation
> > > > >
> > > > > The implementation is pretty straightforward:
> > > > >
> > > > > add a new API in ManagedCursor to do the Scan
> > > > > add the REST API
> > > > > implement in PersistentSubscription a analiseBacklog method that
> does
> > > > the scan
> > > > >
> > > > > The the PersistentSubscription runs the scan:
> > > > >
> > > > > it applies the filters if they are present
> > > > > it considers individuallyDeletedMessages
> > > > >
> > > > > Reject Alternatives
> > > > >
> > > > > We could store somehow some counter about the number of logical
> > > > > messages during writes. But that does not work for a few reasons:
> > > > >
> > > > > you cannot know which subscriptions will be created in a topic
> > > > > subscription can be created from the past (Earliest)
> > > > > subscription filters may change over time: they are usually
> > configured
> > > > > using Subscription Properties, and those properties are dynamic
> > > > > doing computations on the write path (like running filters) kills
> > > > > latency and thoughtput
> > > > >
> > > > > Use a client to clone the subscription and consume data.
> > > > > This doesn't work because you have to transfer the data to the
> > client,
> > > > > and this is possibly a huge amount of work and a waste of
> resources.
> > > >
> > > >
> > >
> > > --
> > > Andrey Yegorov
> > >
> >
>

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by Nicolò Boschi <bo...@gmail.com>.
+1, thanks

Nicolò Boschi

Il Mar 19 Lug 2022, 22:16 Christophe Bornet <bo...@gmail.com> ha
scritto:

> +1
>
> Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <andrey.yegorov@datastax.com
> >
> a écrit :
>
> > +1
> >
> > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <wa...@apache.org> wrote:
> >
> > > +1 (binding)
> > >
> > > I support this enhancement for when a user occasionally requires
> accurate
> > > backlog stats. Once we bring this into service we can see if further
> > > guardrails are required.
> > >
> > > Regards,
> > > Dave
> > >
> > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <eo...@gmail.com>
> > > wrote:
> > > >
> > > > This is the VOTE thread for PIP-187
> > > >
> > > > This is the GH issue: https://github.com/apache/pulsar/issues/16597
> > > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > > >
> > > > The vote is open for at least 48 hours
> > > >
> > > > Below you can find a copy of the text of the PIP
> > > >
> > > > Best regards
> > > > Enrico
> > > >
> > > >
> > > > Motivation
> > > >
> > > > Currently there is no way to have a accurate backlog for a
> > subscription:
> > > >
> > > > you have only the number of "entries", not messages
> > > > server side filters (PIP-105) may filter out some messages
> > > >
> > > > Having the number of entries is sometimes not enough because with
> > > > batch messages the amount of work on the Consumers is proportional to
> > > > the number of messages, that may vary from entry to entry.
> > > >
> > > > Goal
> > > >
> > > > The idea of this patch is to provide a dedicate API (REST,
> > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and
> > > > provide detailed information about that is expected to be delivered
> to
> > > > Consumers.
> > > >
> > > > The operation will be quite expensive because we have to load the
> > > > messages from storage and pass them to the filters, but due to the
> > > > dynamic nature of Pulsar subscriptions there is no other way to have
> > > > this value.
> > > >
> > > > One good strategy to do monitoring/alerting is to setup alerts on the
> > > > usual "stats" and use this new API to inspect the subscription
> deeper,
> > > > typically be issuing a manual command.
> > > >
> > > > API Changes
> > > >
> > > > internal ManagedCursor API:
> > > >
> > > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > > > maxEntries, long timeOutMs);
> > > >
> > > > This method scans the Cursor from the lastMarkDelete position to the
> > > tail.
> > > > There is a time limit and a maxEntries limit, these are needed in
> > > > order to prevent huge (and useless) scans.
> > > > The Predicate can stop the scan, if it doesn't want to continue the
> > > > processing for some reasons.
> > > >
> > > > New REST API:
> > > >
> > > >    @GET
> > > >
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > > Backlog")
> > > >    @ApiOperation(value = "Analyze a subscription, by scanning all the
> > > > unprocessed messages")
> > > >
> > > >    public void analyzeBacklog SubscriptionBacklog(
> > > >           @Suspended final AsyncResponse asyncResponse,
> > > >            @ApiParam(value = "Specify the tenant", required = true)
> > > >            @PathParam("tenant") String tenant,
> > > >            @ApiParam(value = "Specify the namespace", required =
> true)
> > > >            @PathParam("namespace") String namespace,
> > > >            @ApiParam(value = "Specify topic name", required = true)
> > > >            @PathParam("topic") @Encoded String encodedTopic,
> > > >            @ApiParam(value = "Subscription", required = true)
> > > >            @PathParam("subName") String encodedSubName,
> > > >            @ApiParam(value = "Is authentication required to perform
> > > > this operation")
> > > >            @QueryParam("authoritative") @DefaultValue("false")
> > > > boolean authoritative) {
> > > >
> > > > API response model:
> > > >
> > > > public class AnalyzeSubscriptionBacklogResult {
> > > >    private long entries;
> > > >    private long messages;
> > > >
> > > >    private long filterRejectedEntries;
> > > >    private long filterAcceptedEntries;
> > > >    private long filterRescheduledEntries;
> > > >
> > > >    private long filterRejectedMessages;
> > > >    private long filterAcceptedMessages;
> > > >    private long filterRescheduledMessages;
> > > >
> > > >    private boolean aborted;
> > > >
> > > > The response contains "aborted=true" is the request has been aborted
> > > > by some internal limitations, like a timeout or the scan hit the max
> > > > number of entries.
> > > > We are not going to provide more details about the reason of the
> stop.
> > > > It will make the API too detailed and harder to maintain. Also, in
> the
> > > > logs of the broker you will find the details.
> > > >
> > > > New PulsarAdmin API:
> > > >
> > > > /**
> > > >     * Analyze subscription backlog.
> > > >     * This is a potentially expensive operation, as it requires
> > > >     * to read the messages from storage.
> > > >     * This function takes into consideration batch messages
> > > >     * and also Subscription filters.
> > > >     * @param topic
> > > >     *            Topic name
> > > >     * @param subscriptionName
> > > >     *            the subscription
> > > >     * @return an accurate analysis of the backlog
> > > >     * @throws PulsarAdminException
> > > >     *            Unexpected error
> > > >     */
> > > >    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String
> > > > topic, String subscriptionName)
> > > >            throws PulsarAdminException;
> > > >
> > > >    /**
> > > >     * Analyze subscription backlog.
> > > >     * This is a potentially expensive operation, as it requires
> > > >     * to read the messages from storage.
> > > >     * This function takes into consideration batch messages
> > > >     * and also Subscription filters.
> > > >     * @param topic
> > > >     *            Topic name
> > > >     * @param subscriptionName
> > > >     *            the subscription
> > > >     * @return an accurate analysis of the backlog
> > > >     * @throws PulsarAdminException
> > > >     *            Unexpected error
> > > >     */
> > > >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > > analiseSubscriptionBacklogAsync(String topic,
> > > >
> > > >                 String subscriptionName);
> > > >
> > > > A pulsar-admin command will be added as well as usual.
> > > >
> > > > New configuration entries in broker.conf:
> > > >
> > > > @FieldContext(
> > > >         category = CATEGORY_POLICIES,
> > > >         doc = "Maximum time to spend while scanning a subscription to
> > > > calculate the accurate backlog"
> > > > )
> > > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > > @FieldContext(
> > > >         category = CATEGORY_POLICIES,
> > > >         doc = "Maximum number of entries to process while scanning a
> > > > subscription to calculate the accurate backlog"
> > > > )
> > > > private long subscriptionBacklogScanMaxEntries = 10_000;
> > > >
> > > > Implementation
> > > >
> > > > The implementation is pretty straightforward:
> > > >
> > > > add a new API in ManagedCursor to do the Scan
> > > > add the REST API
> > > > implement in PersistentSubscription a analiseBacklog method that does
> > > the scan
> > > >
> > > > The the PersistentSubscription runs the scan:
> > > >
> > > > it applies the filters if they are present
> > > > it considers individuallyDeletedMessages
> > > >
> > > > Reject Alternatives
> > > >
> > > > We could store somehow some counter about the number of logical
> > > > messages during writes. But that does not work for a few reasons:
> > > >
> > > > you cannot know which subscriptions will be created in a topic
> > > > subscription can be created from the past (Earliest)
> > > > subscription filters may change over time: they are usually
> configured
> > > > using Subscription Properties, and those properties are dynamic
> > > > doing computations on the write path (like running filters) kills
> > > > latency and thoughtput
> > > >
> > > > Use a client to clone the subscription and consume data.
> > > > This doesn't work because you have to transfer the data to the
> client,
> > > > and this is possibly a huge amount of work and a waste of resources.
> > >
> > >
> >
> > --
> > Andrey Yegorov
> >
>

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by Christophe Bornet <bo...@gmail.com>.
+1

Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <an...@datastax.com>
a écrit :

> +1
>
> On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <wa...@apache.org> wrote:
>
> > +1 (binding)
> >
> > I support this enhancement for when a user occasionally requires accurate
> > backlog stats. Once we bring this into service we can see if further
> > guardrails are required.
> >
> > Regards,
> > Dave
> >
> > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <eo...@gmail.com>
> > wrote:
> > >
> > > This is the VOTE thread for PIP-187
> > >
> > > This is the GH issue: https://github.com/apache/pulsar/issues/16597
> > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > >
> > > The vote is open for at least 48 hours
> > >
> > > Below you can find a copy of the text of the PIP
> > >
> > > Best regards
> > > Enrico
> > >
> > >
> > > Motivation
> > >
> > > Currently there is no way to have a accurate backlog for a
> subscription:
> > >
> > > you have only the number of "entries", not messages
> > > server side filters (PIP-105) may filter out some messages
> > >
> > > Having the number of entries is sometimes not enough because with
> > > batch messages the amount of work on the Consumers is proportional to
> > > the number of messages, that may vary from entry to entry.
> > >
> > > Goal
> > >
> > > The idea of this patch is to provide a dedicate API (REST,
> > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and
> > > provide detailed information about that is expected to be delivered to
> > > Consumers.
> > >
> > > The operation will be quite expensive because we have to load the
> > > messages from storage and pass them to the filters, but due to the
> > > dynamic nature of Pulsar subscriptions there is no other way to have
> > > this value.
> > >
> > > One good strategy to do monitoring/alerting is to setup alerts on the
> > > usual "stats" and use this new API to inspect the subscription deeper,
> > > typically be issuing a manual command.
> > >
> > > API Changes
> > >
> > > internal ManagedCursor API:
> > >
> > > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > > maxEntries, long timeOutMs);
> > >
> > > This method scans the Cursor from the lastMarkDelete position to the
> > tail.
> > > There is a time limit and a maxEntries limit, these are needed in
> > > order to prevent huge (and useless) scans.
> > > The Predicate can stop the scan, if it doesn't want to continue the
> > > processing for some reasons.
> > >
> > > New REST API:
> > >
> > >    @GET
> > >
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > Backlog")
> > >    @ApiOperation(value = "Analyze a subscription, by scanning all the
> > > unprocessed messages")
> > >
> > >    public void analyzeBacklog SubscriptionBacklog(
> > >           @Suspended final AsyncResponse asyncResponse,
> > >            @ApiParam(value = "Specify the tenant", required = true)
> > >            @PathParam("tenant") String tenant,
> > >            @ApiParam(value = "Specify the namespace", required = true)
> > >            @PathParam("namespace") String namespace,
> > >            @ApiParam(value = "Specify topic name", required = true)
> > >            @PathParam("topic") @Encoded String encodedTopic,
> > >            @ApiParam(value = "Subscription", required = true)
> > >            @PathParam("subName") String encodedSubName,
> > >            @ApiParam(value = "Is authentication required to perform
> > > this operation")
> > >            @QueryParam("authoritative") @DefaultValue("false")
> > > boolean authoritative) {
> > >
> > > API response model:
> > >
> > > public class AnalyzeSubscriptionBacklogResult {
> > >    private long entries;
> > >    private long messages;
> > >
> > >    private long filterRejectedEntries;
> > >    private long filterAcceptedEntries;
> > >    private long filterRescheduledEntries;
> > >
> > >    private long filterRejectedMessages;
> > >    private long filterAcceptedMessages;
> > >    private long filterRescheduledMessages;
> > >
> > >    private boolean aborted;
> > >
> > > The response contains "aborted=true" is the request has been aborted
> > > by some internal limitations, like a timeout or the scan hit the max
> > > number of entries.
> > > We are not going to provide more details about the reason of the stop.
> > > It will make the API too detailed and harder to maintain. Also, in the
> > > logs of the broker you will find the details.
> > >
> > > New PulsarAdmin API:
> > >
> > > /**
> > >     * Analyze subscription backlog.
> > >     * This is a potentially expensive operation, as it requires
> > >     * to read the messages from storage.
> > >     * This function takes into consideration batch messages
> > >     * and also Subscription filters.
> > >     * @param topic
> > >     *            Topic name
> > >     * @param subscriptionName
> > >     *            the subscription
> > >     * @return an accurate analysis of the backlog
> > >     * @throws PulsarAdminException
> > >     *            Unexpected error
> > >     */
> > >    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String
> > > topic, String subscriptionName)
> > >            throws PulsarAdminException;
> > >
> > >    /**
> > >     * Analyze subscription backlog.
> > >     * This is a potentially expensive operation, as it requires
> > >     * to read the messages from storage.
> > >     * This function takes into consideration batch messages
> > >     * and also Subscription filters.
> > >     * @param topic
> > >     *            Topic name
> > >     * @param subscriptionName
> > >     *            the subscription
> > >     * @return an accurate analysis of the backlog
> > >     * @throws PulsarAdminException
> > >     *            Unexpected error
> > >     */
> > >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > > analiseSubscriptionBacklogAsync(String topic,
> > >
> > >                 String subscriptionName);
> > >
> > > A pulsar-admin command will be added as well as usual.
> > >
> > > New configuration entries in broker.conf:
> > >
> > > @FieldContext(
> > >         category = CATEGORY_POLICIES,
> > >         doc = "Maximum time to spend while scanning a subscription to
> > > calculate the accurate backlog"
> > > )
> > > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > > @FieldContext(
> > >         category = CATEGORY_POLICIES,
> > >         doc = "Maximum number of entries to process while scanning a
> > > subscription to calculate the accurate backlog"
> > > )
> > > private long subscriptionBacklogScanMaxEntries = 10_000;
> > >
> > > Implementation
> > >
> > > The implementation is pretty straightforward:
> > >
> > > add a new API in ManagedCursor to do the Scan
> > > add the REST API
> > > implement in PersistentSubscription a analiseBacklog method that does
> > the scan
> > >
> > > The the PersistentSubscription runs the scan:
> > >
> > > it applies the filters if they are present
> > > it considers individuallyDeletedMessages
> > >
> > > Reject Alternatives
> > >
> > > We could store somehow some counter about the number of logical
> > > messages during writes. But that does not work for a few reasons:
> > >
> > > you cannot know which subscriptions will be created in a topic
> > > subscription can be created from the past (Earliest)
> > > subscription filters may change over time: they are usually configured
> > > using Subscription Properties, and those properties are dynamic
> > > doing computations on the write path (like running filters) kills
> > > latency and thoughtput
> > >
> > > Use a client to clone the subscription and consume data.
> > > This doesn't work because you have to transfer the data to the client,
> > > and this is possibly a huge amount of work and a waste of resources.
> >
> >
>
> --
> Andrey Yegorov
>

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by Andrey Yegorov <an...@datastax.com>.
+1

On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher <wa...@apache.org> wrote:

> +1 (binding)
>
> I support this enhancement for when a user occasionally requires accurate
> backlog stats. Once we bring this into service we can see if further
> guardrails are required.
>
> Regards,
> Dave
>
> > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <eo...@gmail.com>
> wrote:
> >
> > This is the VOTE thread for PIP-187
> >
> > This is the GH issue: https://github.com/apache/pulsar/issues/16597
> > This is the PR: https://github.com/apache/pulsar/pull/16545
> >
> > The vote is open for at least 48 hours
> >
> > Below you can find a copy of the text of the PIP
> >
> > Best regards
> > Enrico
> >
> >
> > Motivation
> >
> > Currently there is no way to have a accurate backlog for a subscription:
> >
> > you have only the number of "entries", not messages
> > server side filters (PIP-105) may filter out some messages
> >
> > Having the number of entries is sometimes not enough because with
> > batch messages the amount of work on the Consumers is proportional to
> > the number of messages, that may vary from entry to entry.
> >
> > Goal
> >
> > The idea of this patch is to provide a dedicate API (REST,
> > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and
> > provide detailed information about that is expected to be delivered to
> > Consumers.
> >
> > The operation will be quite expensive because we have to load the
> > messages from storage and pass them to the filters, but due to the
> > dynamic nature of Pulsar subscriptions there is no other way to have
> > this value.
> >
> > One good strategy to do monitoring/alerting is to setup alerts on the
> > usual "stats" and use this new API to inspect the subscription deeper,
> > typically be issuing a manual command.
> >
> > API Changes
> >
> > internal ManagedCursor API:
> >
> > CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> > maxEntries, long timeOutMs);
> >
> > This method scans the Cursor from the lastMarkDelete position to the
> tail.
> > There is a time limit and a maxEntries limit, these are needed in
> > order to prevent huge (and useless) scans.
> > The Predicate can stop the scan, if it doesn't want to continue the
> > processing for some reasons.
> >
> > New REST API:
> >
> >    @GET
> >
> @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > Backlog")
> >    @ApiOperation(value = "Analyze a subscription, by scanning all the
> > unprocessed messages")
> >
> >    public void analyzeBacklog SubscriptionBacklog(
> >           @Suspended final AsyncResponse asyncResponse,
> >            @ApiParam(value = "Specify the tenant", required = true)
> >            @PathParam("tenant") String tenant,
> >            @ApiParam(value = "Specify the namespace", required = true)
> >            @PathParam("namespace") String namespace,
> >            @ApiParam(value = "Specify topic name", required = true)
> >            @PathParam("topic") @Encoded String encodedTopic,
> >            @ApiParam(value = "Subscription", required = true)
> >            @PathParam("subName") String encodedSubName,
> >            @ApiParam(value = "Is authentication required to perform
> > this operation")
> >            @QueryParam("authoritative") @DefaultValue("false")
> > boolean authoritative) {
> >
> > API response model:
> >
> > public class AnalyzeSubscriptionBacklogResult {
> >    private long entries;
> >    private long messages;
> >
> >    private long filterRejectedEntries;
> >    private long filterAcceptedEntries;
> >    private long filterRescheduledEntries;
> >
> >    private long filterRejectedMessages;
> >    private long filterAcceptedMessages;
> >    private long filterRescheduledMessages;
> >
> >    private boolean aborted;
> >
> > The response contains "aborted=true" is the request has been aborted
> > by some internal limitations, like a timeout or the scan hit the max
> > number of entries.
> > We are not going to provide more details about the reason of the stop.
> > It will make the API too detailed and harder to maintain. Also, in the
> > logs of the broker you will find the details.
> >
> > New PulsarAdmin API:
> >
> > /**
> >     * Analyze subscription backlog.
> >     * This is a potentially expensive operation, as it requires
> >     * to read the messages from storage.
> >     * This function takes into consideration batch messages
> >     * and also Subscription filters.
> >     * @param topic
> >     *            Topic name
> >     * @param subscriptionName
> >     *            the subscription
> >     * @return an accurate analysis of the backlog
> >     * @throws PulsarAdminException
> >     *            Unexpected error
> >     */
> >    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String
> > topic, String subscriptionName)
> >            throws PulsarAdminException;
> >
> >    /**
> >     * Analyze subscription backlog.
> >     * This is a potentially expensive operation, as it requires
> >     * to read the messages from storage.
> >     * This function takes into consideration batch messages
> >     * and also Subscription filters.
> >     * @param topic
> >     *            Topic name
> >     * @param subscriptionName
> >     *            the subscription
> >     * @return an accurate analysis of the backlog
> >     * @throws PulsarAdminException
> >     *            Unexpected error
> >     */
> >    CompletableFuture<AnaliseSubscriptionBacklogResult>
> > analiseSubscriptionBacklogAsync(String topic,
> >
> >                 String subscriptionName);
> >
> > A pulsar-admin command will be added as well as usual.
> >
> > New configuration entries in broker.conf:
> >
> > @FieldContext(
> >         category = CATEGORY_POLICIES,
> >         doc = "Maximum time to spend while scanning a subscription to
> > calculate the accurate backlog"
> > )
> > private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> > @FieldContext(
> >         category = CATEGORY_POLICIES,
> >         doc = "Maximum number of entries to process while scanning a
> > subscription to calculate the accurate backlog"
> > )
> > private long subscriptionBacklogScanMaxEntries = 10_000;
> >
> > Implementation
> >
> > The implementation is pretty straightforward:
> >
> > add a new API in ManagedCursor to do the Scan
> > add the REST API
> > implement in PersistentSubscription a analiseBacklog method that does
> the scan
> >
> > The the PersistentSubscription runs the scan:
> >
> > it applies the filters if they are present
> > it considers individuallyDeletedMessages
> >
> > Reject Alternatives
> >
> > We could store somehow some counter about the number of logical
> > messages during writes. But that does not work for a few reasons:
> >
> > you cannot know which subscriptions will be created in a topic
> > subscription can be created from the past (Earliest)
> > subscription filters may change over time: they are usually configured
> > using Subscription Properties, and those properties are dynamic
> > doing computations on the write path (like running filters) kills
> > latency and thoughtput
> >
> > Use a client to clone the subscription and consume data.
> > This doesn't work because you have to transfer the data to the client,
> > and this is possibly a huge amount of work and a waste of resources.
>
>

-- 
Andrey Yegorov

Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

Posted by Dave Fisher <wa...@apache.org>.
+1 (binding)

I support this enhancement for when a user occasionally requires accurate backlog stats. Once we bring this into service we can see if further guardrails are required.

Regards,
Dave

> On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <eo...@gmail.com> wrote:
> 
> This is the VOTE thread for PIP-187
> 
> This is the GH issue: https://github.com/apache/pulsar/issues/16597
> This is the PR: https://github.com/apache/pulsar/pull/16545
> 
> The vote is open for at least 48 hours
> 
> Below you can find a copy of the text of the PIP
> 
> Best regards
> Enrico
> 
> 
> Motivation
> 
> Currently there is no way to have a accurate backlog for a subscription:
> 
> you have only the number of "entries", not messages
> server side filters (PIP-105) may filter out some messages
> 
> Having the number of entries is sometimes not enough because with
> batch messages the amount of work on the Consumers is proportional to
> the number of messages, that may vary from entry to entry.
> 
> Goal
> 
> The idea of this patch is to provide a dedicate API (REST,
> pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription and
> provide detailed information about that is expected to be delivered to
> Consumers.
> 
> The operation will be quite expensive because we have to load the
> messages from storage and pass them to the filters, but due to the
> dynamic nature of Pulsar subscriptions there is no other way to have
> this value.
> 
> One good strategy to do monitoring/alerting is to setup alerts on the
> usual "stats" and use this new API to inspect the subscription deeper,
> typically be issuing a manual command.
> 
> API Changes
> 
> internal ManagedCursor API:
> 
> CompletableFuture<ScanOutcome> scan(Predicate<Entry> condition, long
> maxEntries, long timeOutMs);
> 
> This method scans the Cursor from the lastMarkDelete position to the tail.
> There is a time limit and a maxEntries limit, these are needed in
> order to prevent huge (and useless) scans.
> The Predicate can stop the scan, if it doesn't want to continue the
> processing for some reasons.
> 
> New REST API:
> 
>    @GET
>    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> Backlog")
>    @ApiOperation(value = "Analyze a subscription, by scanning all the
> unprocessed messages")
> 
>    public void analyzeBacklog SubscriptionBacklog(
>           @Suspended final AsyncResponse asyncResponse,
>            @ApiParam(value = "Specify the tenant", required = true)
>            @PathParam("tenant") String tenant,
>            @ApiParam(value = "Specify the namespace", required = true)
>            @PathParam("namespace") String namespace,
>            @ApiParam(value = "Specify topic name", required = true)
>            @PathParam("topic") @Encoded String encodedTopic,
>            @ApiParam(value = "Subscription", required = true)
>            @PathParam("subName") String encodedSubName,
>            @ApiParam(value = "Is authentication required to perform
> this operation")
>            @QueryParam("authoritative") @DefaultValue("false")
> boolean authoritative) {
> 
> API response model:
> 
> public class AnalyzeSubscriptionBacklogResult {
>    private long entries;
>    private long messages;
> 
>    private long filterRejectedEntries;
>    private long filterAcceptedEntries;
>    private long filterRescheduledEntries;
> 
>    private long filterRejectedMessages;
>    private long filterAcceptedMessages;
>    private long filterRescheduledMessages;
> 
>    private boolean aborted;
> 
> The response contains "aborted=true" is the request has been aborted
> by some internal limitations, like a timeout or the scan hit the max
> number of entries.
> We are not going to provide more details about the reason of the stop.
> It will make the API too detailed and harder to maintain. Also, in the
> logs of the broker you will find the details.
> 
> New PulsarAdmin API:
> 
> /**
>     * Analyze subscription backlog.
>     * This is a potentially expensive operation, as it requires
>     * to read the messages from storage.
>     * This function takes into consideration batch messages
>     * and also Subscription filters.
>     * @param topic
>     *            Topic name
>     * @param subscriptionName
>     *            the subscription
>     * @return an accurate analysis of the backlog
>     * @throws PulsarAdminException
>     *            Unexpected error
>     */
>    AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String
> topic, String subscriptionName)
>            throws PulsarAdminException;
> 
>    /**
>     * Analyze subscription backlog.
>     * This is a potentially expensive operation, as it requires
>     * to read the messages from storage.
>     * This function takes into consideration batch messages
>     * and also Subscription filters.
>     * @param topic
>     *            Topic name
>     * @param subscriptionName
>     *            the subscription
>     * @return an accurate analysis of the backlog
>     * @throws PulsarAdminException
>     *            Unexpected error
>     */
>    CompletableFuture<AnaliseSubscriptionBacklogResult>
> analiseSubscriptionBacklogAsync(String topic,
> 
>                 String subscriptionName);
> 
> A pulsar-admin command will be added as well as usual.
> 
> New configuration entries in broker.conf:
> 
> @FieldContext(
>         category = CATEGORY_POLICIES,
>         doc = "Maximum time to spend while scanning a subscription to
> calculate the accurate backlog"
> )
> private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
> @FieldContext(
>         category = CATEGORY_POLICIES,
>         doc = "Maximum number of entries to process while scanning a
> subscription to calculate the accurate backlog"
> )
> private long subscriptionBacklogScanMaxEntries = 10_000;
> 
> Implementation
> 
> The implementation is pretty straightforward:
> 
> add a new API in ManagedCursor to do the Scan
> add the REST API
> implement in PersistentSubscription a analiseBacklog method that does the scan
> 
> The the PersistentSubscription runs the scan:
> 
> it applies the filters if they are present
> it considers individuallyDeletedMessages
> 
> Reject Alternatives
> 
> We could store somehow some counter about the number of logical
> messages during writes. But that does not work for a few reasons:
> 
> you cannot know which subscriptions will be created in a topic
> subscription can be created from the past (Earliest)
> subscription filters may change over time: they are usually configured
> using Subscription Properties, and those properties are dynamic
> doing computations on the write path (like running filters) kills
> latency and thoughtput
> 
> Use a client to clone the subscription and consume data.
> This doesn't work because you have to transfer the data to the client,
> and this is possibly a huge amount of work and a waste of resources.