You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sean Glover <se...@lightbend.com> on 2019/10/15 19:53:16 UTC

Questions about Producer per Task/Partition in Streams EoS Impl.

Hi,

I would like to understand better how a `KafkaProducer` is used within
Kafka Streams EoS use cases.  In the Streams Exactly Once Design [1]
document it states that there is one producer per StreamThread:

Each thread contains one producer client and two consumer clients (one for
> normal fetching from input topic partitions, and one for fetching from
> changelog topics for state restoration only). Tasks assigned to the same
> thread will then share these clients for fetching and producing messages.


But when I look at the 2.3.0 implementation I see that the `threadProducer`
is only defined when not using EoS [2], otherwise it is `null`. Then in
`TaskCreator` a `createProducer` method is defined which will create a new
`KafkaProducer` per task id when the `threadProducer` is null [3].  This
behaviour seems to be confirmed with the
`shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` test
[4].

I assume this just a case of the design doc falling out of sync with the
impl., or maybe I misunderstood its original meaning.  Either way, I still
have some questions:

1. It's my understanding that a `TaskId` is 1:1 with a partition, so is it
the case that there is 1 `KafkaProducer` created per partition?  Is this to
make it easier to support transactions across apps that have multiple group
members, to make rebalancing easier?
2. Are there any performance implications to running so many
KafkaProducer's?  I assume this impl. could lead to 100-1000s of producer
instances created across all Kafka Streams app instances.

[1] Streams Exactly Once Design -
https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#heading=h.mki3gltx1zw
[2] StreamThread.java, creating a `threadProducer` -
https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L501..L507
[3] StreamThread.java, `TaskCreator.producerProvider` -
https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L367..L373
[4] StreamThreadTest.java,
`shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` -
https://github.com/apache/kafka/blob/e3c2148b207a6ca98c89211d12cb47abdfaa70b3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L583

Regards,
Sean

PS. I wasn't sure if this would be more appropriate for the dev@ list, but
I thought I would ask here first.

Re: Questions about Producer per Task/Partition in Streams EoS Impl.

Posted by Sean Glover <se...@lightbend.com>.
Hi Guozhang,

Thanks a lot for pointing me to this KIP and providing background
information on the current performance implications.  I'll track progress
on the KIP.

Regards,
Sean

On Thu, Oct 17, 2019 at 12:29 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Sean,
>
> Yes atm we have one producer per task when EOS is turned on compared to one
> producer per thread without EOS. There's an ongoing KIP-447 aiming to bring
> this back to one producer per thread with EOS as well which involves
> broker-side changes.
>
> To answer your question:
>
> 1. Assuming each task only have one input topic, then yes there's a 1:1
> mapping; if a task involves e.g. joining topic A and topic B, then task1
> would be composed of A-1 and B-1 as input. The reason for having one
> producer per task with EOS and the proposal to get rid of this for better
> scalability can be found in this meetup talk:
>
> https://www.youtube.com/watch?v=j0l_zUhQaTc&list=PLa7VYi0yPIH3kDYOrar5B26WkslCGfDy_&index=7
>
> 2. The major performance implications is two folds: 1) given a fixed total
> traffic, the more producers created, the less the batching effects and
> hence inferior throughput; 2) on the broker side, increased num.connections
> and request rate (i.e. more smaller requests compared to fewer larger
> requests) is also an increased load overhead.
>
> We plan to address KIP-447 in the near term so that we can lift these
> performance hurdles.
>
> Guozhang
>
> On Tue, Oct 15, 2019 at 12:53 PM Sean Glover <se...@lightbend.com>
> wrote:
>
> > Hi,
> >
> > I would like to understand better how a `KafkaProducer` is used within
> > Kafka Streams EoS use cases.  In the Streams Exactly Once Design [1]
> > document it states that there is one producer per StreamThread:
> >
> > Each thread contains one producer client and two consumer clients (one
> for
> > > normal fetching from input topic partitions, and one for fetching from
> > > changelog topics for state restoration only). Tasks assigned to the
> same
> > > thread will then share these clients for fetching and producing
> messages.
> >
> >
> > But when I look at the 2.3.0 implementation I see that the
> `threadProducer`
> > is only defined when not using EoS [2], otherwise it is `null`. Then in
> > `TaskCreator` a `createProducer` method is defined which will create a
> new
> > `KafkaProducer` per task id when the `threadProducer` is null [3].  This
> > behaviour seems to be confirmed with the
> > `shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` test
> > [4].
> >
> > I assume this just a case of the design doc falling out of sync with the
> > impl., or maybe I misunderstood its original meaning.  Either way, I
> still
> > have some questions:
> >
> > 1. It's my understanding that a `TaskId` is 1:1 with a partition, so is
> it
> > the case that there is 1 `KafkaProducer` created per partition?  Is this
> to
> > make it easier to support transactions across apps that have multiple
> group
> > members, to make rebalancing easier?
> > 2. Are there any performance implications to running so many
> > KafkaProducer's?  I assume this impl. could lead to 100-1000s of producer
> > instances created across all Kafka Streams app instances.
> >
> > [1] Streams Exactly Once Design -
> >
> >
> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#heading=h.mki3gltx1zw
> > [2] StreamThread.java, creating a `threadProducer` -
> >
> >
> https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L501..L507
> > [3] StreamThread.java, `TaskCreator.producerProvider` -
> >
> >
> https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L367..L373
> > [4] StreamThreadTest.java,
> > `shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` -
> >
> >
> https://github.com/apache/kafka/blob/e3c2148b207a6ca98c89211d12cb47abdfaa70b3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L583
> >
> > Regards,
> > Sean
> >
> > PS. I wasn't sure if this would be more appropriate for the dev@ list,
> but
> > I thought I would ask here first.
> >
>
>
> --
> -- Guozhang
>

Re: Questions about Producer per Task/Partition in Streams EoS Impl.

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Sean,

Yes atm we have one producer per task when EOS is turned on compared to one
producer per thread without EOS. There's an ongoing KIP-447 aiming to bring
this back to one producer per thread with EOS as well which involves
broker-side changes.

To answer your question:

1. Assuming each task only have one input topic, then yes there's a 1:1
mapping; if a task involves e.g. joining topic A and topic B, then task1
would be composed of A-1 and B-1 as input. The reason for having one
producer per task with EOS and the proposal to get rid of this for better
scalability can be found in this meetup talk:
https://www.youtube.com/watch?v=j0l_zUhQaTc&list=PLa7VYi0yPIH3kDYOrar5B26WkslCGfDy_&index=7

2. The major performance implications is two folds: 1) given a fixed total
traffic, the more producers created, the less the batching effects and
hence inferior throughput; 2) on the broker side, increased num.connections
and request rate (i.e. more smaller requests compared to fewer larger
requests) is also an increased load overhead.

We plan to address KIP-447 in the near term so that we can lift these
performance hurdles.

Guozhang

On Tue, Oct 15, 2019 at 12:53 PM Sean Glover <se...@lightbend.com>
wrote:

> Hi,
>
> I would like to understand better how a `KafkaProducer` is used within
> Kafka Streams EoS use cases.  In the Streams Exactly Once Design [1]
> document it states that there is one producer per StreamThread:
>
> Each thread contains one producer client and two consumer clients (one for
> > normal fetching from input topic partitions, and one for fetching from
> > changelog topics for state restoration only). Tasks assigned to the same
> > thread will then share these clients for fetching and producing messages.
>
>
> But when I look at the 2.3.0 implementation I see that the `threadProducer`
> is only defined when not using EoS [2], otherwise it is `null`. Then in
> `TaskCreator` a `createProducer` method is defined which will create a new
> `KafkaProducer` per task id when the `threadProducer` is null [3].  This
> behaviour seems to be confirmed with the
> `shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` test
> [4].
>
> I assume this just a case of the design doc falling out of sync with the
> impl., or maybe I misunderstood its original meaning.  Either way, I still
> have some questions:
>
> 1. It's my understanding that a `TaskId` is 1:1 with a partition, so is it
> the case that there is 1 `KafkaProducer` created per partition?  Is this to
> make it easier to support transactions across apps that have multiple group
> members, to make rebalancing easier?
> 2. Are there any performance implications to running so many
> KafkaProducer's?  I assume this impl. could lead to 100-1000s of producer
> instances created across all Kafka Streams app instances.
>
> [1] Streams Exactly Once Design -
>
> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#heading=h.mki3gltx1zw
> [2] StreamThread.java, creating a `threadProducer` -
>
> https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L501..L507
> [3] StreamThread.java, `TaskCreator.producerProvider` -
>
> https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L367..L373
> [4] StreamThreadTest.java,
> `shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` -
>
> https://github.com/apache/kafka/blob/e3c2148b207a6ca98c89211d12cb47abdfaa70b3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L583
>
> Regards,
> Sean
>
> PS. I wasn't sure if this would be more appropriate for the dev@ list, but
> I thought I would ask here first.
>


-- 
-- Guozhang