You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Ryanne Dolan <ry...@gmail.com> on 2021/06/04 19:23:06 UTC

Re: [DISCUSS] KIP-731: Record Rate Limiting for Kafka Connect

Hey Tom, thanks for taking a look.

> It's a bit weird that there's a separate start(Time) method

Good call, I think we can use a second constructor instead.

> No metrics for batch rates?

Good call. TBH I assumed there would already be put/poll rates, but looking
again I don't see them. Will add to the KIP.

> I think it might be nicer to have a consistent configuration mechanism

I had previously implemented this as you propose (same as SMTs), but found
it to be a little heavy for the common use-cases. I didn't like how users
needed to specify the classnames in order to use the built-in rate limiters.

But thinking again about this, if we include default values for
rate.limiters, rate.limiter.record.type, and rate.limiter.batch.type, we'd
get the same effect. Namely, most users would just need to
specify rate.limiter.record.limit or rate.limiter.batch.limit.

So I think you're right -- the common use-cases don't necessarily suffer,
and custom rate limiters would definitely benefit. I'll fix.

> hard.rate.limiters [..vs..] rate.limiters

I think the difference may be immaterial. As implemented currently,
RecordRateLimiter and RecordBatchRateLimiter are very "soft" in that they
don't define a window of time in which a max number of records or batches
can be processed. Instead, they just tap the breaks when the instantaneous
rate is observed to be too high. But a "hard" rate limiter could be
implemented with the same interface, e.g. by sleeping until the end of the
current window.

Ryanne

On Fri, May 21, 2021 at 7:10 AM Tom Bentley <tb...@redhat.com> wrote:

> Hi Ryanne,
>
> Thanks for the KIP. I can see this would be useful.
>
> 1. Can you elaborate on the life cycle of the RateLimiter interface (in the
> Javadoc)? In particular it's not clear to me how calls to accumulate() and
> throttleTime() can be interleaved (I assume arbitrarily).
>
> 2. It's a bit weird that there's a separate start(Time) method in addition
> to the configure() inherited from Configurable. Perhaps passing the Time to
> accumulate() would be simpler than needing a two stage configuration step,
> even if it would be the same instance on every call. If start() really is
> needed you should document that it's called after configure().
>
> 3. Maybe including the unit in the method name, i.e. throttleTimeMs(), to
> avoid any ambiguity about how the result is interpreted?
>
> 4. The metrics: Are they windowed over some time period, if so, what?
>
> 5. No metrics for batch rates?
>
> 6. It doesn't seem to be stated, but I assume the throttle time used is the
> maximum of the throttleTime() returned by all the limiters.
>
> 7. The configuration uses a different mechanism than for SMTs and also
> requires to add three common configs (with a risk of collision with any
> connector which already defines configs with these names). I think it might
> be nicer to have a consistent configuration mechanism, so for example
>   rate.limiters=record,batch
>   rate.limiter.record.type=RecordRateLimiter
>   rate.limiter.record.limit=123
>   rate.limiter.batch.type=RecordBatchRateLimiter
>   rate.limiter.batch.limit=456
> This means there's only a single new common config, as the others depend on
> the aliases used, so further collisions can be avoided.
>
> 8. A cluster where every connector has a quota could end up being
> underutilised, yet a subset of connectors could be running at their limit.
> While this makes sense for the firehose problem it seems to be problematic
> for the noisy neighbour case, where the spare capacity could be shared
> between all the throttled tasks on the worker. While I'm not suggesting you
> need to implement this as part of the KIP, maybe the API could accommodate
> it being added later. Perhaps this could be as simple as using
> hard.rate.limiters rather than just rate.limiters, so that
> soft.rate.limiters could be added later, though maybe there are use cases
> where a single limiter needs to supply both soft and hard limits.
>
> Thanks again,
>
> Tom
>
> On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <ry...@gmail.com>
> wrote:
>
> > Hey y'all, I've expanded the scope of this KIP slightly to include a
> > pluggable interface, RateLimiter.
> >
> > After implementing this a few different ways, it's clear that the
> > configuration story is actually simpler with a pluggable model.
> > Out-of-the-box, we have just two configuration properties to tweak:
> > record.rate.limit and record.batch.rate.limit (subj to change ofc). These
> > are provided by built-in RecordRateLimiter and RecordBatchRateLimiter
> > impls.
> >
> > From there, additional custom RateLimiters can be enabled with whatever
> > configuration they need. This is essentially the same pattern taken with
> > MetricsReporters and others.
> >
> > I had originally envisioned that the set of built-in limits would expand
> > over time, eg individual put/poll/commit/flush limits. However, these can
> > all be throttled adequately with the proposed API by limiting overall
> > record and batch thruput.
> >
> > Please let me know what you think. The voting thread is open.
> >
> > Ryanne
> >
> > On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <ry...@gmail.com> wrote:
> >
> > > Hey y'all, I'd like to draw you attention to a new KIP:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect
> > >
> > > Lemme know what you think. Thanks!
> > >
> > > Ryanne
> > >
> >
>

Re: [DISCUSS] KIP-731: Record Rate Limiting for Kafka Connect

Posted by Ryanne Dolan <ry...@gmail.com>.
Thanks Mickael.


> 1) if you set rate.limiters to MyRateLimiter you would
> not have to set these 2 configs.
>

True. That may be motivation to adopt Tom's suggestion re config.


> 2) Should we have a
> ByteRateLimiter?
>

That would be ideal, but it is difficult in practice, since ConnectRecord
doesn't have a concept of size. And what would the size be, anyway? The
true size over the wire is different for each source/sink system. We could
use the size of the Kafka record as a universal approximation, but that is
only indirectly related to the true size, esp if there are SMTs in play.

My thinking is that a good solution here would need to wait for a way for
Connectors to report custom metrics, including bytes in/out. Even so, we've
found that Connectors/Tasks often have no idea how many bytes are going
over the wire. So we'd need a solution that provides a default
approximation (e.g. Kafka record size) but let's connectors refine that, if
they are able.


> 3) what happens if multiple RateLimiters are enabled?
>

The impl waits for the max throttle time. We were using custom SMTs to
implement rate limiting, but delay from multiple such SMTs would be
additive. The idea behind the KIP is to have an SMT-like plug-in that would
not be additive.

4) reusing the transformation/config
> provider syntax for configurations may be nicer for consistency.
>

I have both approaches impl'd and had slight preference for the one
presented. Happy to defer to consensus.


> 6) Is the rate applied before transformations?
>

For source connectors, we should apply the limits before SMTs, and for sink
connectors, after. This way we get as close to the external systems as
possible, which is ultimately what we're trying to protect.

I don't recall how my POC handles this, but we'd need to get this right for
it to make sense.

Ryanne


> Thanks,
> Mickael
>
> On Fri, Jun 4, 2021 at 9:23 PM Ryanne Dolan <ry...@gmail.com> wrote:
> >
> > Hey Tom, thanks for taking a look.
> >
> > > It's a bit weird that there's a separate start(Time) method
> >
> > Good call, I think we can use a second constructor instead.
> >
> > > No metrics for batch rates?
> >
> > Good call. TBH I assumed there would already be put/poll rates, but
> looking
> > again I don't see them. Will add to the KIP.
> >
> > > I think it might be nicer to have a consistent configuration mechanism
> >
> > I had previously implemented this as you propose (same as SMTs), but
> found
> > it to be a little heavy for the common use-cases. I didn't like how users
> > needed to specify the classnames in order to use the built-in rate
> limiters.
> >
> > But thinking again about this, if we include default values for
> > rate.limiters, rate.limiter.record.type, and rate.limiter.batch.type,
> we'd
> > get the same effect. Namely, most users would just need to
> > specify rate.limiter.record.limit or rate.limiter.batch.limit.
> >
> > So I think you're right -- the common use-cases don't necessarily suffer,
> > and custom rate limiters would definitely benefit. I'll fix.
> >
> > > hard.rate.limiters [..vs..] rate.limiters
> >
> > I think the difference may be immaterial. As implemented currently,
> > RecordRateLimiter and RecordBatchRateLimiter are very "soft" in that they
> > don't define a window of time in which a max number of records or batches
> > can be processed. Instead, they just tap the breaks when the
> instantaneous
> > rate is observed to be too high. But a "hard" rate limiter could be
> > implemented with the same interface, e.g. by sleeping until the end of
> the
> > current window.
> >
> > Ryanne
> >
> > On Fri, May 21, 2021 at 7:10 AM Tom Bentley <tb...@redhat.com> wrote:
> >
> > > Hi Ryanne,
> > >
> > > Thanks for the KIP. I can see this would be useful.
> > >
> > > 1. Can you elaborate on the life cycle of the RateLimiter interface
> (in the
> > > Javadoc)? In particular it's not clear to me how calls to accumulate()
> and
> > > throttleTime() can be interleaved (I assume arbitrarily).
> > >
> > > 2. It's a bit weird that there's a separate start(Time) method in
> addition
> > > to the configure() inherited from Configurable. Perhaps passing the
> Time to
> > > accumulate() would be simpler than needing a two stage configuration
> step,
> > > even if it would be the same instance on every call. If start() really
> is
> > > needed you should document that it's called after configure().
> > >
> > > 3. Maybe including the unit in the method name, i.e. throttleTimeMs(),
> to
> > > avoid any ambiguity about how the result is interpreted?
> > >
> > > 4. The metrics: Are they windowed over some time period, if so, what?
> > >
> > > 5. No metrics for batch rates?
> > >
> > > 6. It doesn't seem to be stated, but I assume the throttle time used
> is the
> > > maximum of the throttleTime() returned by all the limiters.
> > >
> > > 7. The configuration uses a different mechanism than for SMTs and also
> > > requires to add three common configs (with a risk of collision with any
> > > connector which already defines configs with these names). I think it
> might
> > > be nicer to have a consistent configuration mechanism, so for example
> > >   rate.limiters=record,batch
> > >   rate.limiter.record.type=RecordRateLimiter
> > >   rate.limiter.record.limit=123
> > >   rate.limiter.batch.type=RecordBatchRateLimiter
> > >   rate.limiter.batch.limit=456
> > > This means there's only a single new common config, as the others
> depend on
> > > the aliases used, so further collisions can be avoided.
> > >
> > > 8. A cluster where every connector has a quota could end up being
> > > underutilised, yet a subset of connectors could be running at their
> limit.
> > > While this makes sense for the firehose problem it seems to be
> problematic
> > > for the noisy neighbour case, where the spare capacity could be shared
> > > between all the throttled tasks on the worker. While I'm not
> suggesting you
> > > need to implement this as part of the KIP, maybe the API could
> accommodate
> > > it being added later. Perhaps this could be as simple as using
> > > hard.rate.limiters rather than just rate.limiters, so that
> > > soft.rate.limiters could be added later, though maybe there are use
> cases
> > > where a single limiter needs to supply both soft and hard limits.
> > >
> > > Thanks again,
> > >
> > > Tom
> > >
> > > On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <ry...@gmail.com>
> > > wrote:
> > >
> > > > Hey y'all, I've expanded the scope of this KIP slightly to include a
> > > > pluggable interface, RateLimiter.
> > > >
> > > > After implementing this a few different ways, it's clear that the
> > > > configuration story is actually simpler with a pluggable model.
> > > > Out-of-the-box, we have just two configuration properties to tweak:
> > > > record.rate.limit and record.batch.rate.limit (subj to change ofc).
> These
> > > > are provided by built-in RecordRateLimiter and RecordBatchRateLimiter
> > > > impls.
> > > >
> > > > From there, additional custom RateLimiters can be enabled with
> whatever
> > > > configuration they need. This is essentially the same pattern taken
> with
> > > > MetricsReporters and others.
> > > >
> > > > I had originally envisioned that the set of built-in limits would
> expand
> > > > over time, eg individual put/poll/commit/flush limits. However,
> these can
> > > > all be throttled adequately with the proposed API by limiting overall
> > > > record and batch thruput.
> > > >
> > > > Please let me know what you think. The voting thread is open.
> > > >
> > > > Ryanne
> > > >
> > > > On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <ry...@gmail.com>
> wrote:
> > > >
> > > > > Hey y'all, I'd like to draw you attention to a new KIP:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect
> > > > >
> > > > > Lemme know what you think. Thanks!
> > > > >
> > > > > Ryanne
> > > > >
> > > >
> > >
>

Re: [DISCUSS] KIP-731: Record Rate Limiting for Kafka Connect

Posted by Mickael Maison <mi...@gmail.com>.
Hi Ryanne,

Thanks for the KIP! Sorry for the delay, I finally took some time to
take a look and I have a few questions:

1) record.rate.limit, record.batch.rate.limit are listed as Connector
configurations. If I understand correctly they are actually
configurations of RecordRateLimiter, and RecordBatchRateLimiter
respectively. So if you set rate.limiters to MyRateLimiter you would
not have to set these 2 configs.

2) The motivation mentions the need to "limit total throughput". The 2
proposed RateLimiter implementations seems to work on the number of
records flowing, not on the actually byte rate. Should we have a
ByteRateLimiter?

3) Do you know what happens if multiple RateLimiters are enabled and
both hit their limit but return a different throttle time? Is the
longest throttle time selected?

4) I tend to agree with Tom that reusing the transformation/config
provider syntax for configurations may be nicer for consistency. While
the built in RateLimiter have few configurations, a custom
implementation may have several and may conflict with configurations
from a connector.

5) Can you explain how ConfigDef config(); is used? Is this method needed?

6) Is the rate applied before transformations? For example if a
transformation filters records, are they still counted?

Thanks,
Mickael

On Fri, Jun 4, 2021 at 9:23 PM Ryanne Dolan <ry...@gmail.com> wrote:
>
> Hey Tom, thanks for taking a look.
>
> > It's a bit weird that there's a separate start(Time) method
>
> Good call, I think we can use a second constructor instead.
>
> > No metrics for batch rates?
>
> Good call. TBH I assumed there would already be put/poll rates, but looking
> again I don't see them. Will add to the KIP.
>
> > I think it might be nicer to have a consistent configuration mechanism
>
> I had previously implemented this as you propose (same as SMTs), but found
> it to be a little heavy for the common use-cases. I didn't like how users
> needed to specify the classnames in order to use the built-in rate limiters.
>
> But thinking again about this, if we include default values for
> rate.limiters, rate.limiter.record.type, and rate.limiter.batch.type, we'd
> get the same effect. Namely, most users would just need to
> specify rate.limiter.record.limit or rate.limiter.batch.limit.
>
> So I think you're right -- the common use-cases don't necessarily suffer,
> and custom rate limiters would definitely benefit. I'll fix.
>
> > hard.rate.limiters [..vs..] rate.limiters
>
> I think the difference may be immaterial. As implemented currently,
> RecordRateLimiter and RecordBatchRateLimiter are very "soft" in that they
> don't define a window of time in which a max number of records or batches
> can be processed. Instead, they just tap the breaks when the instantaneous
> rate is observed to be too high. But a "hard" rate limiter could be
> implemented with the same interface, e.g. by sleeping until the end of the
> current window.
>
> Ryanne
>
> On Fri, May 21, 2021 at 7:10 AM Tom Bentley <tb...@redhat.com> wrote:
>
> > Hi Ryanne,
> >
> > Thanks for the KIP. I can see this would be useful.
> >
> > 1. Can you elaborate on the life cycle of the RateLimiter interface (in the
> > Javadoc)? In particular it's not clear to me how calls to accumulate() and
> > throttleTime() can be interleaved (I assume arbitrarily).
> >
> > 2. It's a bit weird that there's a separate start(Time) method in addition
> > to the configure() inherited from Configurable. Perhaps passing the Time to
> > accumulate() would be simpler than needing a two stage configuration step,
> > even if it would be the same instance on every call. If start() really is
> > needed you should document that it's called after configure().
> >
> > 3. Maybe including the unit in the method name, i.e. throttleTimeMs(), to
> > avoid any ambiguity about how the result is interpreted?
> >
> > 4. The metrics: Are they windowed over some time period, if so, what?
> >
> > 5. No metrics for batch rates?
> >
> > 6. It doesn't seem to be stated, but I assume the throttle time used is the
> > maximum of the throttleTime() returned by all the limiters.
> >
> > 7. The configuration uses a different mechanism than for SMTs and also
> > requires to add three common configs (with a risk of collision with any
> > connector which already defines configs with these names). I think it might
> > be nicer to have a consistent configuration mechanism, so for example
> >   rate.limiters=record,batch
> >   rate.limiter.record.type=RecordRateLimiter
> >   rate.limiter.record.limit=123
> >   rate.limiter.batch.type=RecordBatchRateLimiter
> >   rate.limiter.batch.limit=456
> > This means there's only a single new common config, as the others depend on
> > the aliases used, so further collisions can be avoided.
> >
> > 8. A cluster where every connector has a quota could end up being
> > underutilised, yet a subset of connectors could be running at their limit.
> > While this makes sense for the firehose problem it seems to be problematic
> > for the noisy neighbour case, where the spare capacity could be shared
> > between all the throttled tasks on the worker. While I'm not suggesting you
> > need to implement this as part of the KIP, maybe the API could accommodate
> > it being added later. Perhaps this could be as simple as using
> > hard.rate.limiters rather than just rate.limiters, so that
> > soft.rate.limiters could be added later, though maybe there are use cases
> > where a single limiter needs to supply both soft and hard limits.
> >
> > Thanks again,
> >
> > Tom
> >
> > On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <ry...@gmail.com>
> > wrote:
> >
> > > Hey y'all, I've expanded the scope of this KIP slightly to include a
> > > pluggable interface, RateLimiter.
> > >
> > > After implementing this a few different ways, it's clear that the
> > > configuration story is actually simpler with a pluggable model.
> > > Out-of-the-box, we have just two configuration properties to tweak:
> > > record.rate.limit and record.batch.rate.limit (subj to change ofc). These
> > > are provided by built-in RecordRateLimiter and RecordBatchRateLimiter
> > > impls.
> > >
> > > From there, additional custom RateLimiters can be enabled with whatever
> > > configuration they need. This is essentially the same pattern taken with
> > > MetricsReporters and others.
> > >
> > > I had originally envisioned that the set of built-in limits would expand
> > > over time, eg individual put/poll/commit/flush limits. However, these can
> > > all be throttled adequately with the proposed API by limiting overall
> > > record and batch thruput.
> > >
> > > Please let me know what you think. The voting thread is open.
> > >
> > > Ryanne
> > >
> > > On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <ry...@gmail.com> wrote:
> > >
> > > > Hey y'all, I'd like to draw you attention to a new KIP:
> > > >
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect
> > > >
> > > > Lemme know what you think. Thanks!
> > > >
> > > > Ryanne
> > > >
> > >
> >