You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Tim Corbett <tc...@mparticle.com> on 2023/01/12 01:58:52 UTC

[DISCUSS] KeyShared routing additions/configuration

Greetings,

I was informed to ask this here but was unclear yet if this should be a PIP
or not.  Please advise if I should follow a different process.  Thanks in
advance.

The motivation is as follows: today we have KeyShared subscriptions in
which the consumers are rapidly autoscaling up and down based on real-time
demand.  We've observed that for some data shapes/scaling conditions, some
consumers will end up receiving considerably more traffic than the rest,
slowing down consumption across the topic.  We require all messages with
any given routing key to only be outstanding to one consumer at a time, but
otherwise have no preference for any range of keys to remain sticky to any
given consumer.  I've also seen reference to KeyShared routing performance
concerns elsewhere, e.g.: https://github.com/apache/pulsar/issues/15705

The proposal is two-fold: first, we would like to implement a new KeyShared
routing mechanism which tracks outstanding messages and their consumers,
routing messages with outstanding keys to the current consumer handling
that key, and any messages with new keys to any arbitrary consumer with
available permits (or perhaps the consumer with the most permits).
Basically a "first available consumer" routing strategy.  As far as my
naive first attempt goes, I initially decided to modify
the StickyKeyConsumerSelector::select call to accept an mledger.Position in
addition to the hash.  I also added a release call to notify the selector
of positions to consider no longer outstanding.  I could see this being
implemented any number of other ways as well (such as an entirely new
dispatcher), and would appreciate guidance should this proposal move
forward.

Second, I believe the ability to choose a KeyShared routing scheme and
perhaps the settings for that scheme should be configurable as a
namespace/topic policy and not just in the broker config.  I have not begun
work on implementing that at all yet, but would assume it is not too
complicated to do so (though the settings construct may be more freeform
than expected).

In the referenced PR above, Penghui mentioned that the cost of tracking all
outstanding messages would be too costly as a reason for not implementing
this in the first place, as "the current implementation doesn't need to
maintain the state for each key since a topic might have a huge number of
keys."  I would like to counter that with most topics do not have in excess
of hundreds of thousands of messages outstanding simultaneously in the
first place, there is already a policy to limit the number of outstanding
messages by subscription, and you don't actually need an entry for each key
necessarily, if performance/memory constraints demanded, some sort of
reference count by (shortened) hash could be attempted.  Regardless, I have
not performance tested my idea just yet as if it will be rejected for other
reasons the effort is not well spent.

To give some idea on our stats, we host multiple clusters today but the
busiest one has maybe 36 topics which each handle roughly 8k msgs/s,
between 100-500 producers, a similar number of consumers, and an average of
500 unacked messages at a time, with a peak of 15k or so, occasionally
bouncing off our configured policy.  Consumers tend to autoscale every 3-5
minutes, but can cycle even faster than that depending on the rest of our
platform.  This functionality is what lead us to using Pulsar in the first
place, and in that respect it has served decently enough, but ongoing cost
optimizations would prefer better consumer performance.

I look forward to hearing your opinions on this proposal and any next steps
I can perform.  Thank you for your time!

-Tim Corbett
tcorbett@mparticle.com
Principal Platform Engineer - mParticle

Re: [DISCUSS] KeyShared routing additions/configuration

Posted by Tim Corbett <tc...@mparticle.com>.
To start, sorry if this breaks threading and/or ends up looking
hideous, I am new to using these mailing lists and was not subscribed
so can't seem to resume the thread from the latest message.

> > The proposal is two-fold: first, we would like to implement a new KeyShared
> > routing mechanism which tracks outstanding messages and their consumers,
> > routing messages with outstanding keys to the current consumer handling
> > that key, and any messages with new keys to any arbitrary consumer with
> > available permits (or perhaps the consumer with the most permits).
> > Basically a "first available consumer" routing strategy.  As far as my
> > naive first attempt goes, I initially decided to modify
> > the StickyKeyConsumerSelector::select call to accept an mledger.Position in
> > addition to the hash.  I also added a release call to notify the selector
> > of positions to consider no longer outstanding.  I could see this being
> > implemented any number of other ways as well (such as an entirely new
> > dispatcher), and would appreciate guidance should this proposal move
> > forward.
>
> If you are using the consistent hash consumer selector. You can try to
> add more replica points. But it also depends on your keys, if the number
> of messages for each key roughly the same and how many keys you have.
>
We are using the consistent hash selector today, and have tried several
different values for replica point count, but none have been satisfactory
up to 2000.  As you can imagine, at our consumer count and rate of
consumer churn increasing point count beyond that also hurts performance
a decent bit.

As for our key distribution, I wouldn't call it perfectly even, but it
is even enough.
Internal metrics in our consumers show that the issue is not strictly
a "hot key";
the hot consumers are receiving a high cardinality of keys still but
perhaps that
swath of keys is harder to work on or more numerous at that time for some
reason.  Hence the requirement to be able to route work away from those busy
workers ad hoc.

> I just thought about it(available permits-based selector) roughly. The
> available
> permits are unstable. But after the key is assigned to a consumer, the
> relationship will not change, right?
>
I'm not sure what you mean by "unstable", hopefully you can clarify a bit?  In
implementing the POC though I have run into the issue that a large group of
messages are routed to consumers before any are sent/deducted from the
available permit counts.  A two-step process of grouping messages by hash
and then selecting a consumer during iteration/sending would suffice, but
perhaps this leans more towards this being a new dispatcher?  Not sure yet.

In response to your question about keys being assigned to consumers not
changing, no, they will be free to change as soon as that consumer no longer
has any unacknowledged messages with that key/hash.  This allows keys to
reroute to less busy consumers dynamically while still maintaining
serial/in-order message processing by key.

I've also considered another potential issue but I believe it exists today:
topic unloading.  Once reloaded, any messages in flight will be redelivered
to new consumers, yet without strict transactions existing consumers will
likely continue to work on the messages they have in progress at the time.
Perhaps this isn't worth worrying about but I did consider it and I don't have
any good solutions in mind.

> > Second, I believe the ability to choose a KeyShared routing scheme and
> > perhaps the settings for that scheme should be configurable as a
> > namespace/topic policy and not just in the broker config.  I have not begun
> > work on implementing that at all yet, but would assume it is not too
> > complicated to do so (though the settings construct may be more freeform
> > than expected).
>
> Yes, that looks good.
>
> Thanks,
> Penghui

Thanks for your help so far,
-Tim Corbett

Re: [DISCUSS] KeyShared routing additions/configuration

Posted by PengHui Li <pe...@apache.org>.
Hi Tim,

> The proposal is two-fold: first, we would like to implement a new
KeyShared
routing mechanism which tracks outstanding messages and their consumers,
routing messages with outstanding keys to the current consumer handling
that key, and any messages with new keys to any arbitrary consumer with
available permits (or perhaps the consumer with the most permits).
Basically a "first available consumer" routing strategy.  As far as my
naive first attempt goes, I initially decided to modify
the StickyKeyConsumerSelector::select call to accept an mledger.Position in
addition to the hash.  I also added a release call to notify the selector
of positions to consider no longer outstanding.  I could see this being
implemented any number of other ways as well (such as an entirely new
dispatcher), and would appreciate guidance should this proposal move
forward.

If you are using the consistent hash consumer selector. You can try to
add more replica points. But it also depends on your keys, if the number
of messages for each key roughly the same and how many keys you have.

I just thought about it(available permits-based selector) roughly. The
available
permits are unstable. But after the key is assigned to a consumer, the
relationship will
not change, right?

> Second, I believe the ability to choose a KeyShared routing scheme and
perhaps the settings for that scheme should be configurable as a
namespace/topic policy and not just in the broker config.  I have not begun
work on implementing that at all yet, but would assume it is not too
complicated to do so (though the settings construct may be more freeform
than expected).

Yes, that looks good.

Thanks,
Penghui

On Fri, Jan 13, 2023 at 4:09 PM Enrico Olivelli <eo...@gmail.com> wrote:

> Tim,
>
> Il giorno gio 12 gen 2023 alle ore 04:31 Tim Corbett
> <tc...@mparticle.com> ha scritto:
> >
> > Greetings,
> >
> > I was informed to ask this here but was unclear yet if this should be a
> PIP
> > or not.  Please advise if I should follow a different process.  Thanks in
> > advance.
> >
> > The motivation is as follows: today we have KeyShared subscriptions in
> > which the consumers are rapidly autoscaling up and down based on
> real-time
> > demand.  We've observed that for some data shapes/scaling conditions,
> some
> > consumers will end up receiving considerably more traffic than the rest,
> > slowing down consumption across the topic.  We require all messages with
> > any given routing key to only be outstanding to one consumer at a time,
> but
> > otherwise have no preference for any range of keys to remain sticky to
> any
> > given consumer.  I've also seen reference to KeyShared routing
> performance
> > concerns elsewhere, e.g.: https://github.com/apache/pulsar/issues/15705
> >
> > The proposal is two-fold: first, we would like to implement a new
> KeyShared
> > routing mechanism which tracks outstanding messages and their consumers,
> > routing messages with outstanding keys to the current consumer handling
> > that key, and any messages with new keys to any arbitrary consumer with
> > available permits (or perhaps the consumer with the most permits).
> > Basically a "first available consumer" routing strategy.  As far as my
> > naive first attempt goes, I initially decided to modify
> > the StickyKeyConsumerSelector::select call to accept an mledger.Position
> in
> > addition to the hash.  I also added a release call to notify the selector
> > of positions to consider no longer outstanding.  I could see this being
> > implemented any number of other ways as well (such as an entirely new
> > dispatcher), and would appreciate guidance should this proposal move
> > forward.
>
> Sounds good
>
> >
> > Second, I believe the ability to choose a KeyShared routing scheme and
> > perhaps the settings for that scheme should be configurable as a
> > namespace/topic policy and not just in the broker config.  I have not
> begun
> > work on implementing that at all yet, but would assume it is not too
> > complicated to do so (though the settings construct may be more freeform
> > than expected).
>
> This is a good idea as well
>
> >
> > In the referenced PR above, Penghui mentioned that the cost of tracking
> all
> > outstanding messages would be too costly as a reason for not implementing
> > this in the first place, as "the current implementation doesn't need to
> > maintain the state for each key since a topic might have a huge number of
> > keys."  I would like to counter that with most topics do not have in
> excess
> > of hundreds of thousands of messages outstanding simultaneously in the
> > first place, there is already a policy to limit the number of outstanding
> > messages by subscription, and you don't actually need an entry for each
> key
> > necessarily, if performance/memory constraints demanded, some sort of
> > reference count by (shortened) hash could be attempted.  Regardless, I
> have
> > not performance tested my idea just yet as if it will be rejected for
> other
> > reasons the effort is not well spent.
>
> I think that your use case is valid. Then it is a matter of finding
> the best solution.
>
> >
> > To give some idea on our stats, we host multiple clusters today but the
> > busiest one has maybe 36 topics which each handle roughly 8k msgs/s,
> > between 100-500 producers, a similar number of consumers, and an average
> of
> > 500 unacked messages at a time, with a peak of 15k or so, occasionally
> > bouncing off our configured policy.  Consumers tend to autoscale every
> 3-5
> > minutes, but can cycle even faster than that depending on the rest of our
> > platform.  This functionality is what lead us to using Pulsar in the
> first
> > place, and in that respect it has served decently enough, but ongoing
> cost
> > optimizations would prefer better consumer performance.
> >
> > I look forward to hearing your opinions on this proposal and any next
> steps
> > I can perform.  Thank you for your time!
>
> Next steps:
> - I would start with the first idea, implementing the second feature
> is a independent topic, and probably less urgent (given that you can
> isolate namespaces)
> - draft a proof-of-concept PR for early review
> - draft a PIP about the design
> - follow the discussion, have the PIP approved
> - move the PR (or set of PRs) from POC to ready-for-review
>
> Best regards
>
> Enrico
>
>
> >
> > -Tim Corbett
> > tcorbett@mparticle.com
> > Principal Platform Engineer - mParticle
>

Re: [DISCUSS] KeyShared routing additions/configuration

Posted by Enrico Olivelli <eo...@gmail.com>.
Tim,

Il giorno gio 12 gen 2023 alle ore 04:31 Tim Corbett
<tc...@mparticle.com> ha scritto:
>
> Greetings,
>
> I was informed to ask this here but was unclear yet if this should be a PIP
> or not.  Please advise if I should follow a different process.  Thanks in
> advance.
>
> The motivation is as follows: today we have KeyShared subscriptions in
> which the consumers are rapidly autoscaling up and down based on real-time
> demand.  We've observed that for some data shapes/scaling conditions, some
> consumers will end up receiving considerably more traffic than the rest,
> slowing down consumption across the topic.  We require all messages with
> any given routing key to only be outstanding to one consumer at a time, but
> otherwise have no preference for any range of keys to remain sticky to any
> given consumer.  I've also seen reference to KeyShared routing performance
> concerns elsewhere, e.g.: https://github.com/apache/pulsar/issues/15705
>
> The proposal is two-fold: first, we would like to implement a new KeyShared
> routing mechanism which tracks outstanding messages and their consumers,
> routing messages with outstanding keys to the current consumer handling
> that key, and any messages with new keys to any arbitrary consumer with
> available permits (or perhaps the consumer with the most permits).
> Basically a "first available consumer" routing strategy.  As far as my
> naive first attempt goes, I initially decided to modify
> the StickyKeyConsumerSelector::select call to accept an mledger.Position in
> addition to the hash.  I also added a release call to notify the selector
> of positions to consider no longer outstanding.  I could see this being
> implemented any number of other ways as well (such as an entirely new
> dispatcher), and would appreciate guidance should this proposal move
> forward.

Sounds good

>
> Second, I believe the ability to choose a KeyShared routing scheme and
> perhaps the settings for that scheme should be configurable as a
> namespace/topic policy and not just in the broker config.  I have not begun
> work on implementing that at all yet, but would assume it is not too
> complicated to do so (though the settings construct may be more freeform
> than expected).

This is a good idea as well

>
> In the referenced PR above, Penghui mentioned that the cost of tracking all
> outstanding messages would be too costly as a reason for not implementing
> this in the first place, as "the current implementation doesn't need to
> maintain the state for each key since a topic might have a huge number of
> keys."  I would like to counter that with most topics do not have in excess
> of hundreds of thousands of messages outstanding simultaneously in the
> first place, there is already a policy to limit the number of outstanding
> messages by subscription, and you don't actually need an entry for each key
> necessarily, if performance/memory constraints demanded, some sort of
> reference count by (shortened) hash could be attempted.  Regardless, I have
> not performance tested my idea just yet as if it will be rejected for other
> reasons the effort is not well spent.

I think that your use case is valid. Then it is a matter of finding
the best solution.

>
> To give some idea on our stats, we host multiple clusters today but the
> busiest one has maybe 36 topics which each handle roughly 8k msgs/s,
> between 100-500 producers, a similar number of consumers, and an average of
> 500 unacked messages at a time, with a peak of 15k or so, occasionally
> bouncing off our configured policy.  Consumers tend to autoscale every 3-5
> minutes, but can cycle even faster than that depending on the rest of our
> platform.  This functionality is what lead us to using Pulsar in the first
> place, and in that respect it has served decently enough, but ongoing cost
> optimizations would prefer better consumer performance.
>
> I look forward to hearing your opinions on this proposal and any next steps
> I can perform.  Thank you for your time!

Next steps:
- I would start with the first idea, implementing the second feature
is a independent topic, and probably less urgent (given that you can
isolate namespaces)
- draft a proof-of-concept PR for early review
- draft a PIP about the design
- follow the discussion, have the PIP approved
- move the PR (or set of PRs) from POC to ready-for-review

Best regards

Enrico


>
> -Tim Corbett
> tcorbett@mparticle.com
> Principal Platform Engineer - mParticle