You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Enrico Olivelli <eo...@gmail.com> on 2022/08/24 15:52:50 UTC

[DISCUSS] Catch up reads with many subscriptions - saving reads from BK

Hello folks,
These days I am working on a use case in which I have many
subscriptions on the same topic (~100).
If you get to a situation with a big backlog, the consumers are no
more able to leverage the broker cache then the broker starts to read
from BookKeeper.

In 2.11 we have a first mechanism of catch-up reads cache for
backlogged consumers (when you have a few backlogged cursors we start
putting in the cache the entries that you read).

That is not enough to deal with my case because if you have many
consumers (one per partition) that connect all together, for instance
after a major problem in the application, the cache is empty and the
broker starts to overwhelm the bookies with reads.

As a short term solution I have created this patch
https://github.com/apache/pulsar/pull/17241

The patch basically saves reads from BK by preventing concurrent reads
for the same entries (actually "ranges of entries").
If the broker starts a read for a range and then comes another read
for the same range before the read completes, then we skip sending the
new read and we use the result of the BK request.

This simple patch is really effective, in some tests it reduces a lot
(from 100K reads/s to 250 reads/s) the concurrent reads to BK.
You may say that it is unlikely that the broker sends concurrent
requests for exactly the same range of entries but empirically I have
observed that it happens quite often, at least in the scenario I drew
above.

My proposal is to commit my patch as a short term solution.
I have already talked with PengHui and a better solution would be to
introduce explicitly a "Catch Up reads cache", but that is a mid term
implementation.

WDYT ?


Enrico

Re: [DISCUSS] Catch up reads with many subscriptions - saving reads from BK

Posted by Enrico Olivelli <eo...@gmail.com>.
After doing more testing I have updated the patch with a small change
but very effective
https://github.com/apache/pulsar/pull/17241

With the latest commit now we are able to attach to pending requests
for ranges that are "larger" than the requested range.
So if you have 100...200 in pending state and it comes 101..200 the
new read will be attached to the pending read.

I have attached to the PR some graphs to show that the rate of
"partial matches" is very high both in the tailing reads case and in
catch up cases.
In my test case I have 62 subscriptions on a partitioned topic with 6
partitions, on 6 brokers .

I thought about try to make it more complicated, like:
- splitting the reads to smaller chunks and aligned
- try to use a pending read that overlaps with the requested range and
start a second read to read the missing entries

Both of the two strategies are very complex and the current form is
already doing good results

I did some investigation on the overhead on allocations, the GC is not
suffering from this change in my tests

Enrico

Re: [DISCUSS] Catch up reads with many subscriptions - saving reads from BK

Posted by Enrico Olivelli <eo...@gmail.com>.
Thank you folks for your feedback

Il giorno gio 25 ago 2022 alle ore 05:56 Haiting Jiang
<ji...@gmail.com> ha scritto:
>
> Here is my idea of handing the read position shift if we want to go a bit
> further with this solution.
>
> Instead of only checking the exact match of entry range match
> (PendingReadKey in the PR),
> we can check if the requesting range is covered by several ongoing read
> requests, and merge
>  the result of these requests.
>
> For example, currently there are already inflight request reads of entries
> [10,20] and [20,30],
> and now comes a new request [15,25], it can just merge the result from
> [10,20] and [20,30],
> and avoids sending the request to BK.
>
> As for the complexity checking procedure, I think we can sort all the
> inflight requests by startEntryId
> and apply a binary search here, to reduce the time complexity to O(logN)
> where N is the
> max entry size in one ledger.

This is an interesting idea.
We are doing more testing to empirically understand how frequently we
have overlapping
ranges.
Because if it is not frequent to have such overlapping ranges then we
will waste resources.

Currently my explanation about this scenario is that groups of
subscriptions are close to each other
and nobody is able to leverage the entries cached by the others and
they try to read the same entries.

By default we have this default of 100 in dispatcherMaxReadBatchSize
and that limits the ranges to be up to 100 entries.

When there are no errors in reads and the consumers work well all the
subscriptions quickly reach the max value (100).
So if you are lucky they read the same ranges.
If you are not lucky, because they start from different positions then
this feature does not help.
By the way this means that it is likely that you have:
100...200,200...300,180...280....
but not
100...200, 120...180
So it is unlikely that you see a range that is totally included in a
pending read.

a note about "merging":
the main problem is that we have to choose the "best way" of combining
what's going on and what you can do:
A) use parts of the pending reads (how many reads do you want to concatenate?)
B) add one (or two) new read to fill the missing entries and merge
C) start a new read from scratch

Also, if you go for B) and not for C) other new readers may miss a
chance to leverage your range

I will do further testing in order to understand if merging two
existing pending reads improves the situation.



Enrico

>
> Thanks,
> Haiting
>
> On Thu, Aug 25, 2022 at 11:40 AM Haiting Jiang <ji...@gmail.com>
> wrote:
>
> > +1 for this optimization for the specific scenario.
> >
> > This simple patch is really effective, in some tests it reduces a lot
> >> (from 100K reads/s to 250 reads/s) the concurrent reads to BK.
> >> You may say that it is unlikely that the broker sends concurrent
> >> requests for exactly the same range of entries but empirically I have
> >> observed that it happens quite often, at least in the scenario I drew
> >> above.
> >
> >
> > This is quite an interesting test result. My guess is that all the
> > subscriptions
> > have exactly the same backlog and all the messages are consumed instantly
> > and successfully. So that they can read the same range of entries.
> > Once there are some consuming delays or retries due to failures, I believe
> > there will be read position shifts and the BK read requests will go up.
> >
> > Thanks,
> > Haiting
> >
> > On Thu, Aug 25, 2022 at 12:12 AM Michael Marshall <mm...@apache.org>
> > wrote:
> >
> >> +1 I support merging the temporary solution.
> >>
> >> > Just one point, please make sure the change will not introduce too much
> >> > heap memory overhead.
> >>
> >> If we see that there is extra pressure, I wonder if we can dynamically
> >> determine when to deduplicate requests. A broker only needs this
> >> feature when a topic has multiple subscriptions. I haven't double
> >> checked if this information is easily accessible to the cache.
> >>
> >> One of the interesting benefits of this optimization is that it will
> >> decrease direct memory pressure. When we do not deduplicate reads, the
> >> same entry will be duplicated in the bookie's netty direct memory
> >> pool. With this design, an entry will be materialized once with many
> >> different ByteBuffer's objects pointing to it.
> >>
> >> Thanks,
> >> Michael
> >>
> >> On Wed, Aug 24, 2022 at 11:00 AM PengHui Li <pe...@apache.org> wrote:
> >> >
> >> > +1 for we can have a short-term solution first.
> >> >
> >> > Just one point, please make sure the change will not introduce too much
> >> > heap memory overhead.
> >> >
> >> > Thanks,
> >> > Penghui
> >> >
> >> > On Wed, Aug 24, 2022 at 11:54 PM Enrico Olivelli <eo...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hello folks,
> >> > > These days I am working on a use case in which I have many
> >> > > subscriptions on the same topic (~100).
> >> > > If you get to a situation with a big backlog, the consumers are no
> >> > > more able to leverage the broker cache then the broker starts to read
> >> > > from BookKeeper.
> >> > >
> >> > > In 2.11 we have a first mechanism of catch-up reads cache for
> >> > > backlogged consumers (when you have a few backlogged cursors we start
> >> > > putting in the cache the entries that you read).
> >> > >
> >> > > That is not enough to deal with my case because if you have many
> >> > > consumers (one per partition) that connect all together, for instance
> >> > > after a major problem in the application, the cache is empty and the
> >> > > broker starts to overwhelm the bookies with reads.
> >> > >
> >> > > As a short term solution I have created this patch
> >> > > https://github.com/apache/pulsar/pull/17241
> >> > >
> >> > > The patch basically saves reads from BK by preventing concurrent reads
> >> > > for the same entries (actually "ranges of entries").
> >> > > If the broker starts a read for a range and then comes another read
> >> > > for the same range before the read completes, then we skip sending the
> >> > > new read and we use the result of the BK request.
> >> > >
> >> > > This simple patch is really effective, in some tests it reduces a lot
> >> > > (from 100K reads/s to 250 reads/s) the concurrent reads to BK.
> >> > > You may say that it is unlikely that the broker sends concurrent
> >> > > requests for exactly the same range of entries but empirically I have
> >> > > observed that it happens quite often, at least in the scenario I drew
> >> > > above.
> >> > >
> >> > > My proposal is to commit my patch as a short term solution.
> >> > > I have already talked with PengHui and a better solution would be to
> >> > > introduce explicitly a "Catch Up reads cache", but that is a mid term
> >> > > implementation.
> >> > >
> >> > > WDYT ?
> >> > >
> >> > >
> >> > > Enrico
> >> > >
> >>
> >

Re: [DISCUSS] Catch up reads with many subscriptions - saving reads from BK

Posted by Haiting Jiang <ji...@gmail.com>.
Here is my idea of handing the read position shift if we want to go a bit
further with this solution.

Instead of only checking the exact match of entry range match
(PendingReadKey in the PR),
we can check if the requesting range is covered by several ongoing read
requests, and merge
 the result of these requests.

For example, currently there are already inflight request reads of entries
[10,20] and [20,30],
and now comes a new request [15,25], it can just merge the result from
[10,20] and [20,30],
and avoids sending the request to BK.

As for the complexity checking procedure, I think we can sort all the
inflight requests by startEntryId
and apply a binary search here, to reduce the time complexity to O(logN)
where N is the
max entry size in one ledger.

Thanks,
Haiting

On Thu, Aug 25, 2022 at 11:40 AM Haiting Jiang <ji...@gmail.com>
wrote:

> +1 for this optimization for the specific scenario.
>
> This simple patch is really effective, in some tests it reduces a lot
>> (from 100K reads/s to 250 reads/s) the concurrent reads to BK.
>> You may say that it is unlikely that the broker sends concurrent
>> requests for exactly the same range of entries but empirically I have
>> observed that it happens quite often, at least in the scenario I drew
>> above.
>
>
> This is quite an interesting test result. My guess is that all the
> subscriptions
> have exactly the same backlog and all the messages are consumed instantly
> and successfully. So that they can read the same range of entries.
> Once there are some consuming delays or retries due to failures, I believe
> there will be read position shifts and the BK read requests will go up.
>
> Thanks,
> Haiting
>
> On Thu, Aug 25, 2022 at 12:12 AM Michael Marshall <mm...@apache.org>
> wrote:
>
>> +1 I support merging the temporary solution.
>>
>> > Just one point, please make sure the change will not introduce too much
>> > heap memory overhead.
>>
>> If we see that there is extra pressure, I wonder if we can dynamically
>> determine when to deduplicate requests. A broker only needs this
>> feature when a topic has multiple subscriptions. I haven't double
>> checked if this information is easily accessible to the cache.
>>
>> One of the interesting benefits of this optimization is that it will
>> decrease direct memory pressure. When we do not deduplicate reads, the
>> same entry will be duplicated in the bookie's netty direct memory
>> pool. With this design, an entry will be materialized once with many
>> different ByteBuffer's objects pointing to it.
>>
>> Thanks,
>> Michael
>>
>> On Wed, Aug 24, 2022 at 11:00 AM PengHui Li <pe...@apache.org> wrote:
>> >
>> > +1 for we can have a short-term solution first.
>> >
>> > Just one point, please make sure the change will not introduce too much
>> > heap memory overhead.
>> >
>> > Thanks,
>> > Penghui
>> >
>> > On Wed, Aug 24, 2022 at 11:54 PM Enrico Olivelli <eo...@gmail.com>
>> > wrote:
>> >
>> > > Hello folks,
>> > > These days I am working on a use case in which I have many
>> > > subscriptions on the same topic (~100).
>> > > If you get to a situation with a big backlog, the consumers are no
>> > > more able to leverage the broker cache then the broker starts to read
>> > > from BookKeeper.
>> > >
>> > > In 2.11 we have a first mechanism of catch-up reads cache for
>> > > backlogged consumers (when you have a few backlogged cursors we start
>> > > putting in the cache the entries that you read).
>> > >
>> > > That is not enough to deal with my case because if you have many
>> > > consumers (one per partition) that connect all together, for instance
>> > > after a major problem in the application, the cache is empty and the
>> > > broker starts to overwhelm the bookies with reads.
>> > >
>> > > As a short term solution I have created this patch
>> > > https://github.com/apache/pulsar/pull/17241
>> > >
>> > > The patch basically saves reads from BK by preventing concurrent reads
>> > > for the same entries (actually "ranges of entries").
>> > > If the broker starts a read for a range and then comes another read
>> > > for the same range before the read completes, then we skip sending the
>> > > new read and we use the result of the BK request.
>> > >
>> > > This simple patch is really effective, in some tests it reduces a lot
>> > > (from 100K reads/s to 250 reads/s) the concurrent reads to BK.
>> > > You may say that it is unlikely that the broker sends concurrent
>> > > requests for exactly the same range of entries but empirically I have
>> > > observed that it happens quite often, at least in the scenario I drew
>> > > above.
>> > >
>> > > My proposal is to commit my patch as a short term solution.
>> > > I have already talked with PengHui and a better solution would be to
>> > > introduce explicitly a "Catch Up reads cache", but that is a mid term
>> > > implementation.
>> > >
>> > > WDYT ?
>> > >
>> > >
>> > > Enrico
>> > >
>>
>

Re: [DISCUSS] Catch up reads with many subscriptions - saving reads from BK

Posted by Haiting Jiang <ji...@gmail.com>.
+1 for this optimization for the specific scenario.

This simple patch is really effective, in some tests it reduces a lot
> (from 100K reads/s to 250 reads/s) the concurrent reads to BK.
> You may say that it is unlikely that the broker sends concurrent
> requests for exactly the same range of entries but empirically I have
> observed that it happens quite often, at least in the scenario I drew
> above.


This is quite an interesting test result. My guess is that all the
subscriptions
have exactly the same backlog and all the messages are consumed instantly
and successfully. So that they can read the same range of entries.
Once there are some consuming delays or retries due to failures, I believe
there will be read position shifts and the BK read requests will go up.

Thanks,
Haiting

On Thu, Aug 25, 2022 at 12:12 AM Michael Marshall <mm...@apache.org>
wrote:

> +1 I support merging the temporary solution.
>
> > Just one point, please make sure the change will not introduce too much
> > heap memory overhead.
>
> If we see that there is extra pressure, I wonder if we can dynamically
> determine when to deduplicate requests. A broker only needs this
> feature when a topic has multiple subscriptions. I haven't double
> checked if this information is easily accessible to the cache.
>
> One of the interesting benefits of this optimization is that it will
> decrease direct memory pressure. When we do not deduplicate reads, the
> same entry will be duplicated in the bookie's netty direct memory
> pool. With this design, an entry will be materialized once with many
> different ByteBuffer's objects pointing to it.
>
> Thanks,
> Michael
>
> On Wed, Aug 24, 2022 at 11:00 AM PengHui Li <pe...@apache.org> wrote:
> >
> > +1 for we can have a short-term solution first.
> >
> > Just one point, please make sure the change will not introduce too much
> > heap memory overhead.
> >
> > Thanks,
> > Penghui
> >
> > On Wed, Aug 24, 2022 at 11:54 PM Enrico Olivelli <eo...@gmail.com>
> > wrote:
> >
> > > Hello folks,
> > > These days I am working on a use case in which I have many
> > > subscriptions on the same topic (~100).
> > > If you get to a situation with a big backlog, the consumers are no
> > > more able to leverage the broker cache then the broker starts to read
> > > from BookKeeper.
> > >
> > > In 2.11 we have a first mechanism of catch-up reads cache for
> > > backlogged consumers (when you have a few backlogged cursors we start
> > > putting in the cache the entries that you read).
> > >
> > > That is not enough to deal with my case because if you have many
> > > consumers (one per partition) that connect all together, for instance
> > > after a major problem in the application, the cache is empty and the
> > > broker starts to overwhelm the bookies with reads.
> > >
> > > As a short term solution I have created this patch
> > > https://github.com/apache/pulsar/pull/17241
> > >
> > > The patch basically saves reads from BK by preventing concurrent reads
> > > for the same entries (actually "ranges of entries").
> > > If the broker starts a read for a range and then comes another read
> > > for the same range before the read completes, then we skip sending the
> > > new read and we use the result of the BK request.
> > >
> > > This simple patch is really effective, in some tests it reduces a lot
> > > (from 100K reads/s to 250 reads/s) the concurrent reads to BK.
> > > You may say that it is unlikely that the broker sends concurrent
> > > requests for exactly the same range of entries but empirically I have
> > > observed that it happens quite often, at least in the scenario I drew
> > > above.
> > >
> > > My proposal is to commit my patch as a short term solution.
> > > I have already talked with PengHui and a better solution would be to
> > > introduce explicitly a "Catch Up reads cache", but that is a mid term
> > > implementation.
> > >
> > > WDYT ?
> > >
> > >
> > > Enrico
> > >
>

Re: [DISCUSS] Catch up reads with many subscriptions - saving reads from BK

Posted by Michael Marshall <mm...@apache.org>.
+1 I support merging the temporary solution.

> Just one point, please make sure the change will not introduce too much
> heap memory overhead.

If we see that there is extra pressure, I wonder if we can dynamically
determine when to deduplicate requests. A broker only needs this
feature when a topic has multiple subscriptions. I haven't double
checked if this information is easily accessible to the cache.

One of the interesting benefits of this optimization is that it will
decrease direct memory pressure. When we do not deduplicate reads, the
same entry will be duplicated in the bookie's netty direct memory
pool. With this design, an entry will be materialized once with many
different ByteBuffer's objects pointing to it.

Thanks,
Michael

On Wed, Aug 24, 2022 at 11:00 AM PengHui Li <pe...@apache.org> wrote:
>
> +1 for we can have a short-term solution first.
>
> Just one point, please make sure the change will not introduce too much
> heap memory overhead.
>
> Thanks,
> Penghui
>
> On Wed, Aug 24, 2022 at 11:54 PM Enrico Olivelli <eo...@gmail.com>
> wrote:
>
> > Hello folks,
> > These days I am working on a use case in which I have many
> > subscriptions on the same topic (~100).
> > If you get to a situation with a big backlog, the consumers are no
> > more able to leverage the broker cache then the broker starts to read
> > from BookKeeper.
> >
> > In 2.11 we have a first mechanism of catch-up reads cache for
> > backlogged consumers (when you have a few backlogged cursors we start
> > putting in the cache the entries that you read).
> >
> > That is not enough to deal with my case because if you have many
> > consumers (one per partition) that connect all together, for instance
> > after a major problem in the application, the cache is empty and the
> > broker starts to overwhelm the bookies with reads.
> >
> > As a short term solution I have created this patch
> > https://github.com/apache/pulsar/pull/17241
> >
> > The patch basically saves reads from BK by preventing concurrent reads
> > for the same entries (actually "ranges of entries").
> > If the broker starts a read for a range and then comes another read
> > for the same range before the read completes, then we skip sending the
> > new read and we use the result of the BK request.
> >
> > This simple patch is really effective, in some tests it reduces a lot
> > (from 100K reads/s to 250 reads/s) the concurrent reads to BK.
> > You may say that it is unlikely that the broker sends concurrent
> > requests for exactly the same range of entries but empirically I have
> > observed that it happens quite often, at least in the scenario I drew
> > above.
> >
> > My proposal is to commit my patch as a short term solution.
> > I have already talked with PengHui and a better solution would be to
> > introduce explicitly a "Catch Up reads cache", but that is a mid term
> > implementation.
> >
> > WDYT ?
> >
> >
> > Enrico
> >

Re: [DISCUSS] Catch up reads with many subscriptions - saving reads from BK

Posted by PengHui Li <pe...@apache.org>.
+1 for we can have a short-term solution first.

Just one point, please make sure the change will not introduce too much
heap memory overhead.

Thanks,
Penghui

On Wed, Aug 24, 2022 at 11:54 PM Enrico Olivelli <eo...@gmail.com>
wrote:

> Hello folks,
> These days I am working on a use case in which I have many
> subscriptions on the same topic (~100).
> If you get to a situation with a big backlog, the consumers are no
> more able to leverage the broker cache then the broker starts to read
> from BookKeeper.
>
> In 2.11 we have a first mechanism of catch-up reads cache for
> backlogged consumers (when you have a few backlogged cursors we start
> putting in the cache the entries that you read).
>
> That is not enough to deal with my case because if you have many
> consumers (one per partition) that connect all together, for instance
> after a major problem in the application, the cache is empty and the
> broker starts to overwhelm the bookies with reads.
>
> As a short term solution I have created this patch
> https://github.com/apache/pulsar/pull/17241
>
> The patch basically saves reads from BK by preventing concurrent reads
> for the same entries (actually "ranges of entries").
> If the broker starts a read for a range and then comes another read
> for the same range before the read completes, then we skip sending the
> new read and we use the result of the BK request.
>
> This simple patch is really effective, in some tests it reduces a lot
> (from 100K reads/s to 250 reads/s) the concurrent reads to BK.
> You may say that it is unlikely that the broker sends concurrent
> requests for exactly the same range of entries but empirically I have
> observed that it happens quite often, at least in the scenario I drew
> above.
>
> My proposal is to commit my patch as a short term solution.
> I have already talked with PengHui and a better solution would be to
> introduce explicitly a "Catch Up reads cache", but that is a mid term
> implementation.
>
> WDYT ?
>
>
> Enrico
>