You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2020/10/06 10:57:53 UTC

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

Matthias,

I am just wondering how the tasks will be spread across threads in case I
have lesser threads than the number of partitions. Specifically taking my
use case, I have 3 inputs topics with 8 partitions each and I can configure
12 threads, so how below topics partitions will be distributed among 12
threads.
Note that topic C is generally idle and carries traffic only sometimes, so
I would want partitions from topic C to be evenly distributed so all
partitions from topic C don't get assigned to only some of the threads.

Topic A - 8 partitions
Topic B - 8 partitions
Topic C - 8 partitions

On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote:

> That is correct.
>
> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks
> for the first sub-topology and 6 tasks for the second sub-topology and
> you can run up to 11 threads, each executing one task.
>
>
> -Matthias
>
> On 9/4/20 1:30 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > Let's say we have independent sub topologies like: in this case, will the
> > streams create tasks equal to the total number of partitions from topicA
> > and topicB, and can we assign stream thread count that is sum of the
> > partition of the two topics?
> >
> > builder.stream("topicA").filter().to();
> > builder.stream("topicB").filter().to();
> >
> > On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> Well, it depends on your program.
> >>
> >> The reason for the current task creating strategy are joins: If you have
> >> two input topic that you want to join, the join happens on a
> >> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus
> >> both partitions must be assigned to the same task (to get co-partitioned
> >> data processed together).
> >>
> >> Note, that the following program would create independent tasks as it
> >> consist of two independent sub-topologies:
> >>
> >> builder.stream("topicA").filter().to();
> >> builder.stream("topicB").filter().to();
> >>
> >> However, the next program would be one sub-topology and thus we apply
> >> the "join" rule (as we don't really know if you actually execute a join
> >> or not when we create tasks):
> >>
> >> KStream s1 = builder.stream("topicA");
> >> builser.stream("topicB").merge(s1).filter().to();
> >>
> >>
> >> Having said that, I agree that it would be a nice improvement to be more
> >> clever about it. However, it not easy to do. There is actually a related
> >> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
> >>
> >>
> >> Hope this helps.
> >>   -Matthias
> >>
> >> On 9/2/20 11:09 PM, Pushkar Deole wrote:
> >>> Hi,
> >>>
> >>> I came across articles where it is explained how parallelism is handled
> >> in
> >>> kafka streams. This is what I collected:
> >>> When the streams application is reading from multiple topics, the topic
> >>> with maximum number of partitions is considered for instantiating
> stream
> >>> tasks so 1 task is instantiated per partition.
> >>> Now, if the stream task is reading from multiple topics then the
> >> partitions
> >>> of multiple topics are shared among those stream tasks.
> >>>
> >>> For example, Topic A and B has 5 partitions each then 5 tasks are
> >>> instantiated and assigned to 5 stream threads where each task is
> >> assigned 1
> >>> partition from Topic A and Topic B.
> >>>
> >>> The question here is : if I would want 1 task to be created for each
> >>> partition from the input topic then is this possible? e.g. I would want
> >> to
> >>> have 5 tasks for topic A and 5 for B and then would want 10 threads to
> >>> handle those. How can this be achieved?
> >>>
> >>
> >>
> >
>
>

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

Posted by Pushkar Deole <pd...@gmail.com>.
Matthias,

Any recommendations?

Also, while doing performance test, I observed that the partitions assigned
to stream threads are changing. Why would this happen when the instances
are not going down?

e.g. i see partitions being changed between stream thread consumers. The
highlighted in bold are partition numbers of input topic, and when i
checked few hours ago, streamthread-12 showed different partition numbers
and even different topics

analytics-event-filter analytics-engagement          *13*         308973
       308980          7
analytics-event-filter-StreamThread-12-consumer-965a5a1c-e3f0-4d4e-bc54-1e4e375d2ac8
/10.200.27.207  analytics-event-filter-StreamThread-12-consumer
analytics-event-filter analytics-agent-account-state *14         *88053
      88057           4
analytics-event-filter-StreamThread-12-consumer-965a5a1c-e3f0-4d4e-bc54-1e4e375d2ac8
/10.200.27.207  analytics-event-filter-StreamThread-12-consumer

On Fri, Oct 9, 2020 at 8:20 AM Pushkar Deole <pd...@gmail.com> wrote:

> I looked at the task assignment and it looked random for some threads:
> e.g. i have 3 topics 24 partitions each and have 3 instances of
> application. So, each instance assigned 8 partitions per topic, i.e. total
> 24 partitions for 3 topics.
>
> When I set 8 stream threads, I expected each thread to be assigned 1
> partition from each topic, however some of the threads got assigned
> partitions only from 2 of the topics.
> Since topic C is not carrying traffic, those threads that did not get
> assigned partition from topic C got overloaded than others.
>
> Topic A
> Topic
>
> On Wed, Oct 7, 2020 at 11:45 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> Well, there are many what-ifs and I am not sure if there is general
>> advice.
>>
>> Maybe a more generic response: do you actually observe a concrete issue
>> with the task assignment that impacts your app measurable? Or might this
>> be a case of premature optimization?
>>
>> -Matthias
>>
>> On 10/6/20 10:13 AM, Pushkar Deole wrote:
>> > So, what do you suggest to address the topic C with lesser traffic?
>> Should
>> > we create a separate StreamBuilder and build a separate topology for
>> topic
>> > C so we can configure number of threads as per our requirement for that
>> > topic?
>> >
>> > On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> >> The current assignment would be "round robin". Ie, after all tasks are
>> >> created, we just take task-by-task and assign one to the threads
>> >> one-by-one.
>> >>
>> >> Note though, that the assignment algorithm might change at any point,
>> so
>> >> you should not rely on it.
>> >>
>> >> We are also not able to know if one topic has less traffic than others
>> >> and thus must blindly assume (what is of course a simplification) that
>> >> all topics have the same traffic. We only consider the difference
>> >> between stateless and stateful tasks atm.
>> >>
>> >> -Matthias
>> >>
>> >> On 10/6/20 3:57 AM, Pushkar Deole wrote:
>> >>> Matthias,
>> >>>
>> >>> I am just wondering how the tasks will be spread across threads in
>> case I
>> >>> have lesser threads than the number of partitions. Specifically
>> taking my
>> >>> use case, I have 3 inputs topics with 8 partitions each and I can
>> >> configure
>> >>> 12 threads, so how below topics partitions will be distributed among
>> 12
>> >>> threads.
>> >>> Note that topic C is generally idle and carries traffic only
>> sometimes,
>> >> so
>> >>> I would want partitions from topic C to be evenly distributed so all
>> >>> partitions from topic C don't get assigned to only some of the
>> threads.
>> >>>
>> >>> Topic A - 8 partitions
>> >>> Topic B - 8 partitions
>> >>> Topic C - 8 partitions
>> >>>
>> >>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >>>
>> >>>> That is correct.
>> >>>>
>> >>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5
>> tasks
>> >>>> for the first sub-topology and 6 tasks for the second sub-topology
>> and
>> >>>> you can run up to 11 threads, each executing one task.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 9/4/20 1:30 AM, Pushkar Deole wrote:
>> >>>>> Matthias,
>> >>>>>
>> >>>>> Let's say we have independent sub topologies like: in this case,
>> will
>> >> the
>> >>>>> streams create tasks equal to the total number of partitions from
>> >> topicA
>> >>>>> and topicB, and can we assign stream thread count that is sum of the
>> >>>>> partition of the two topics?
>> >>>>>
>> >>>>> builder.stream("topicA").filter().to();
>> >>>>> builder.stream("topicB").filter().to();
>> >>>>>
>> >>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org>
>> >> wrote:
>> >>>>>
>> >>>>>> Well, it depends on your program.
>> >>>>>>
>> >>>>>> The reason for the current task creating strategy are joins: If you
>> >> have
>> >>>>>> two input topic that you want to join, the join happens on a
>> >>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and
>> thus
>> >>>>>> both partitions must be assigned to the same task (to get
>> >> co-partitioned
>> >>>>>> data processed together).
>> >>>>>>
>> >>>>>> Note, that the following program would create independent tasks as
>> it
>> >>>>>> consist of two independent sub-topologies:
>> >>>>>>
>> >>>>>> builder.stream("topicA").filter().to();
>> >>>>>> builder.stream("topicB").filter().to();
>> >>>>>>
>> >>>>>> However, the next program would be one sub-topology and thus we
>> apply
>> >>>>>> the "join" rule (as we don't really know if you actually execute a
>> >> join
>> >>>>>> or not when we create tasks):
>> >>>>>>
>> >>>>>> KStream s1 = builder.stream("topicA");
>> >>>>>> builser.stream("topicB").merge(s1).filter().to();
>> >>>>>>
>> >>>>>>
>> >>>>>> Having said that, I agree that it would be a nice improvement to be
>> >> more
>> >>>>>> clever about it. However, it not easy to do. There is actually a
>> >> related
>> >>>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
>> >>>>>>
>> >>>>>>
>> >>>>>> Hope this helps.
>> >>>>>>   -Matthias
>> >>>>>>
>> >>>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote:
>> >>>>>>> Hi,
>> >>>>>>>
>> >>>>>>> I came across articles where it is explained how parallelism is
>> >> handled
>> >>>>>> in
>> >>>>>>> kafka streams. This is what I collected:
>> >>>>>>> When the streams application is reading from multiple topics, the
>> >> topic
>> >>>>>>> with maximum number of partitions is considered for instantiating
>> >>>> stream
>> >>>>>>> tasks so 1 task is instantiated per partition.
>> >>>>>>> Now, if the stream task is reading from multiple topics then the
>> >>>>>> partitions
>> >>>>>>> of multiple topics are shared among those stream tasks.
>> >>>>>>>
>> >>>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are
>> >>>>>>> instantiated and assigned to 5 stream threads where each task is
>> >>>>>> assigned 1
>> >>>>>>> partition from Topic A and Topic B.
>> >>>>>>>
>> >>>>>>> The question here is : if I would want 1 task to be created for
>> each
>> >>>>>>> partition from the input topic then is this possible? e.g. I would
>> >> want
>> >>>>>> to
>> >>>>>>> have 5 tasks for topic A and 5 for B and then would want 10
>> threads
>> >> to
>> >>>>>>> handle those. How can this be achieved?
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

Posted by Pushkar Deole <pd...@gmail.com>.
I looked at the task assignment and it looked random for some threads: e.g.
i have 3 topics 24 partitions each and have 3 instances of application. So,
each instance assigned 8 partitions per topic, i.e. total 24 partitions for
3 topics.

When I set 8 stream threads, I expected each thread to be assigned 1
partition from each topic, however some of the threads got assigned
partitions only from 2 of the topics.
Since topic C is not carrying traffic, those threads that did not get
assigned partition from topic C got overloaded than others.

Topic A
Topic

On Wed, Oct 7, 2020 at 11:45 PM Matthias J. Sax <mj...@apache.org> wrote:

> Well, there are many what-ifs and I am not sure if there is general advice.
>
> Maybe a more generic response: do you actually observe a concrete issue
> with the task assignment that impacts your app measurable? Or might this
> be a case of premature optimization?
>
> -Matthias
>
> On 10/6/20 10:13 AM, Pushkar Deole wrote:
> > So, what do you suggest to address the topic C with lesser traffic?
> Should
> > we create a separate StreamBuilder and build a separate topology for
> topic
> > C so we can configure number of threads as per our requirement for that
> > topic?
> >
> > On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> The current assignment would be "round robin". Ie, after all tasks are
> >> created, we just take task-by-task and assign one to the threads
> >> one-by-one.
> >>
> >> Note though, that the assignment algorithm might change at any point, so
> >> you should not rely on it.
> >>
> >> We are also not able to know if one topic has less traffic than others
> >> and thus must blindly assume (what is of course a simplification) that
> >> all topics have the same traffic. We only consider the difference
> >> between stateless and stateful tasks atm.
> >>
> >> -Matthias
> >>
> >> On 10/6/20 3:57 AM, Pushkar Deole wrote:
> >>> Matthias,
> >>>
> >>> I am just wondering how the tasks will be spread across threads in
> case I
> >>> have lesser threads than the number of partitions. Specifically taking
> my
> >>> use case, I have 3 inputs topics with 8 partitions each and I can
> >> configure
> >>> 12 threads, so how below topics partitions will be distributed among 12
> >>> threads.
> >>> Note that topic C is generally idle and carries traffic only sometimes,
> >> so
> >>> I would want partitions from topic C to be evenly distributed so all
> >>> partitions from topic C don't get assigned to only some of the threads.
> >>>
> >>> Topic A - 8 partitions
> >>> Topic B - 8 partitions
> >>> Topic C - 8 partitions
> >>>
> >>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>
> >>>> That is correct.
> >>>>
> >>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5
> tasks
> >>>> for the first sub-topology and 6 tasks for the second sub-topology and
> >>>> you can run up to 11 threads, each executing one task.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 9/4/20 1:30 AM, Pushkar Deole wrote:
> >>>>> Matthias,
> >>>>>
> >>>>> Let's say we have independent sub topologies like: in this case, will
> >> the
> >>>>> streams create tasks equal to the total number of partitions from
> >> topicA
> >>>>> and topicB, and can we assign stream thread count that is sum of the
> >>>>> partition of the two topics?
> >>>>>
> >>>>> builder.stream("topicA").filter().to();
> >>>>> builder.stream("topicB").filter().to();
> >>>>>
> >>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Well, it depends on your program.
> >>>>>>
> >>>>>> The reason for the current task creating strategy are joins: If you
> >> have
> >>>>>> two input topic that you want to join, the join happens on a
> >>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and
> thus
> >>>>>> both partitions must be assigned to the same task (to get
> >> co-partitioned
> >>>>>> data processed together).
> >>>>>>
> >>>>>> Note, that the following program would create independent tasks as
> it
> >>>>>> consist of two independent sub-topologies:
> >>>>>>
> >>>>>> builder.stream("topicA").filter().to();
> >>>>>> builder.stream("topicB").filter().to();
> >>>>>>
> >>>>>> However, the next program would be one sub-topology and thus we
> apply
> >>>>>> the "join" rule (as we don't really know if you actually execute a
> >> join
> >>>>>> or not when we create tasks):
> >>>>>>
> >>>>>> KStream s1 = builder.stream("topicA");
> >>>>>> builser.stream("topicB").merge(s1).filter().to();
> >>>>>>
> >>>>>>
> >>>>>> Having said that, I agree that it would be a nice improvement to be
> >> more
> >>>>>> clever about it. However, it not easy to do. There is actually a
> >> related
> >>>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
> >>>>>>
> >>>>>>
> >>>>>> Hope this helps.
> >>>>>>   -Matthias
> >>>>>>
> >>>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I came across articles where it is explained how parallelism is
> >> handled
> >>>>>> in
> >>>>>>> kafka streams. This is what I collected:
> >>>>>>> When the streams application is reading from multiple topics, the
> >> topic
> >>>>>>> with maximum number of partitions is considered for instantiating
> >>>> stream
> >>>>>>> tasks so 1 task is instantiated per partition.
> >>>>>>> Now, if the stream task is reading from multiple topics then the
> >>>>>> partitions
> >>>>>>> of multiple topics are shared among those stream tasks.
> >>>>>>>
> >>>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are
> >>>>>>> instantiated and assigned to 5 stream threads where each task is
> >>>>>> assigned 1
> >>>>>>> partition from Topic A and Topic B.
> >>>>>>>
> >>>>>>> The question here is : if I would want 1 task to be created for
> each
> >>>>>>> partition from the input topic then is this possible? e.g. I would
> >> want
> >>>>>> to
> >>>>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads
> >> to
> >>>>>>> handle those. How can this be achieved?
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

Posted by "Matthias J. Sax" <mj...@apache.org>.
Well, there are many what-ifs and I am not sure if there is general advice.

Maybe a more generic response: do you actually observe a concrete issue
with the task assignment that impacts your app measurable? Or might this
be a case of premature optimization?

-Matthias

On 10/6/20 10:13 AM, Pushkar Deole wrote:
> So, what do you suggest to address the topic C with lesser traffic? Should
> we create a separate StreamBuilder and build a separate topology for topic
> C so we can configure number of threads as per our requirement for that
> topic?
> 
> On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> The current assignment would be "round robin". Ie, after all tasks are
>> created, we just take task-by-task and assign one to the threads
>> one-by-one.
>>
>> Note though, that the assignment algorithm might change at any point, so
>> you should not rely on it.
>>
>> We are also not able to know if one topic has less traffic than others
>> and thus must blindly assume (what is of course a simplification) that
>> all topics have the same traffic. We only consider the difference
>> between stateless and stateful tasks atm.
>>
>> -Matthias
>>
>> On 10/6/20 3:57 AM, Pushkar Deole wrote:
>>> Matthias,
>>>
>>> I am just wondering how the tasks will be spread across threads in case I
>>> have lesser threads than the number of partitions. Specifically taking my
>>> use case, I have 3 inputs topics with 8 partitions each and I can
>> configure
>>> 12 threads, so how below topics partitions will be distributed among 12
>>> threads.
>>> Note that topic C is generally idle and carries traffic only sometimes,
>> so
>>> I would want partitions from topic C to be evenly distributed so all
>>> partitions from topic C don't get assigned to only some of the threads.
>>>
>>> Topic A - 8 partitions
>>> Topic B - 8 partitions
>>> Topic C - 8 partitions
>>>
>>> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>>> That is correct.
>>>>
>>>> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks
>>>> for the first sub-topology and 6 tasks for the second sub-topology and
>>>> you can run up to 11 threads, each executing one task.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 9/4/20 1:30 AM, Pushkar Deole wrote:
>>>>> Matthias,
>>>>>
>>>>> Let's say we have independent sub topologies like: in this case, will
>> the
>>>>> streams create tasks equal to the total number of partitions from
>> topicA
>>>>> and topicB, and can we assign stream thread count that is sum of the
>>>>> partition of the two topics?
>>>>>
>>>>> builder.stream("topicA").filter().to();
>>>>> builder.stream("topicB").filter().to();
>>>>>
>>>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>>>
>>>>>> Well, it depends on your program.
>>>>>>
>>>>>> The reason for the current task creating strategy are joins: If you
>> have
>>>>>> two input topic that you want to join, the join happens on a
>>>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus
>>>>>> both partitions must be assigned to the same task (to get
>> co-partitioned
>>>>>> data processed together).
>>>>>>
>>>>>> Note, that the following program would create independent tasks as it
>>>>>> consist of two independent sub-topologies:
>>>>>>
>>>>>> builder.stream("topicA").filter().to();
>>>>>> builder.stream("topicB").filter().to();
>>>>>>
>>>>>> However, the next program would be one sub-topology and thus we apply
>>>>>> the "join" rule (as we don't really know if you actually execute a
>> join
>>>>>> or not when we create tasks):
>>>>>>
>>>>>> KStream s1 = builder.stream("topicA");
>>>>>> builser.stream("topicB").merge(s1).filter().to();
>>>>>>
>>>>>>
>>>>>> Having said that, I agree that it would be a nice improvement to be
>> more
>>>>>> clever about it. However, it not easy to do. There is actually a
>> related
>>>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
>>>>>>
>>>>>>
>>>>>> Hope this helps.
>>>>>>   -Matthias
>>>>>>
>>>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> I came across articles where it is explained how parallelism is
>> handled
>>>>>> in
>>>>>>> kafka streams. This is what I collected:
>>>>>>> When the streams application is reading from multiple topics, the
>> topic
>>>>>>> with maximum number of partitions is considered for instantiating
>>>> stream
>>>>>>> tasks so 1 task is instantiated per partition.
>>>>>>> Now, if the stream task is reading from multiple topics then the
>>>>>> partitions
>>>>>>> of multiple topics are shared among those stream tasks.
>>>>>>>
>>>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are
>>>>>>> instantiated and assigned to 5 stream threads where each task is
>>>>>> assigned 1
>>>>>>> partition from Topic A and Topic B.
>>>>>>>
>>>>>>> The question here is : if I would want 1 task to be created for each
>>>>>>> partition from the input topic then is this possible? e.g. I would
>> want
>>>>>> to
>>>>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads
>> to
>>>>>>> handle those. How can this be achieved?
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

Posted by Pushkar Deole <pd...@gmail.com>.
So, what do you suggest to address the topic C with lesser traffic? Should
we create a separate StreamBuilder and build a separate topology for topic
C so we can configure number of threads as per our requirement for that
topic?

On Tue, Oct 6, 2020 at 10:27 PM Matthias J. Sax <mj...@apache.org> wrote:

> The current assignment would be "round robin". Ie, after all tasks are
> created, we just take task-by-task and assign one to the threads
> one-by-one.
>
> Note though, that the assignment algorithm might change at any point, so
> you should not rely on it.
>
> We are also not able to know if one topic has less traffic than others
> and thus must blindly assume (what is of course a simplification) that
> all topics have the same traffic. We only consider the difference
> between stateless and stateful tasks atm.
>
> -Matthias
>
> On 10/6/20 3:57 AM, Pushkar Deole wrote:
> > Matthias,
> >
> > I am just wondering how the tasks will be spread across threads in case I
> > have lesser threads than the number of partitions. Specifically taking my
> > use case, I have 3 inputs topics with 8 partitions each and I can
> configure
> > 12 threads, so how below topics partitions will be distributed among 12
> > threads.
> > Note that topic C is generally idle and carries traffic only sometimes,
> so
> > I would want partitions from topic C to be evenly distributed so all
> > partitions from topic C don't get assigned to only some of the threads.
> >
> > Topic A - 8 partitions
> > Topic B - 8 partitions
> > Topic C - 8 partitions
> >
> > On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> That is correct.
> >>
> >> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks
> >> for the first sub-topology and 6 tasks for the second sub-topology and
> >> you can run up to 11 threads, each executing one task.
> >>
> >>
> >> -Matthias
> >>
> >> On 9/4/20 1:30 AM, Pushkar Deole wrote:
> >>> Matthias,
> >>>
> >>> Let's say we have independent sub topologies like: in this case, will
> the
> >>> streams create tasks equal to the total number of partitions from
> topicA
> >>> and topicB, and can we assign stream thread count that is sum of the
> >>> partition of the two topics?
> >>>
> >>> builder.stream("topicA").filter().to();
> >>> builder.stream("topicB").filter().to();
> >>>
> >>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>
> >>>> Well, it depends on your program.
> >>>>
> >>>> The reason for the current task creating strategy are joins: If you
> have
> >>>> two input topic that you want to join, the join happens on a
> >>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus
> >>>> both partitions must be assigned to the same task (to get
> co-partitioned
> >>>> data processed together).
> >>>>
> >>>> Note, that the following program would create independent tasks as it
> >>>> consist of two independent sub-topologies:
> >>>>
> >>>> builder.stream("topicA").filter().to();
> >>>> builder.stream("topicB").filter().to();
> >>>>
> >>>> However, the next program would be one sub-topology and thus we apply
> >>>> the "join" rule (as we don't really know if you actually execute a
> join
> >>>> or not when we create tasks):
> >>>>
> >>>> KStream s1 = builder.stream("topicA");
> >>>> builser.stream("topicB").merge(s1).filter().to();
> >>>>
> >>>>
> >>>> Having said that, I agree that it would be a nice improvement to be
> more
> >>>> clever about it. However, it not easy to do. There is actually a
> related
> >>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
> >>>>
> >>>>
> >>>> Hope this helps.
> >>>>   -Matthias
> >>>>
> >>>> On 9/2/20 11:09 PM, Pushkar Deole wrote:
> >>>>> Hi,
> >>>>>
> >>>>> I came across articles where it is explained how parallelism is
> handled
> >>>> in
> >>>>> kafka streams. This is what I collected:
> >>>>> When the streams application is reading from multiple topics, the
> topic
> >>>>> with maximum number of partitions is considered for instantiating
> >> stream
> >>>>> tasks so 1 task is instantiated per partition.
> >>>>> Now, if the stream task is reading from multiple topics then the
> >>>> partitions
> >>>>> of multiple topics are shared among those stream tasks.
> >>>>>
> >>>>> For example, Topic A and B has 5 partitions each then 5 tasks are
> >>>>> instantiated and assigned to 5 stream threads where each task is
> >>>> assigned 1
> >>>>> partition from Topic A and Topic B.
> >>>>>
> >>>>> The question here is : if I would want 1 task to be created for each
> >>>>> partition from the input topic then is this possible? e.g. I would
> want
> >>>> to
> >>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads
> to
> >>>>> handle those. How can this be achieved?
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>

Re: Kafka streams parallelism - why not separate stream task per partition per input topic

Posted by "Matthias J. Sax" <mj...@apache.org>.
The current assignment would be "round robin". Ie, after all tasks are
created, we just take task-by-task and assign one to the threads one-by-one.

Note though, that the assignment algorithm might change at any point, so
you should not rely on it.

We are also not able to know if one topic has less traffic than others
and thus must blindly assume (what is of course a simplification) that
all topics have the same traffic. We only consider the difference
between stateless and stateful tasks atm.

-Matthias

On 10/6/20 3:57 AM, Pushkar Deole wrote:
> Matthias,
> 
> I am just wondering how the tasks will be spread across threads in case I
> have lesser threads than the number of partitions. Specifically taking my
> use case, I have 3 inputs topics with 8 partitions each and I can configure
> 12 threads, so how below topics partitions will be distributed among 12
> threads.
> Note that topic C is generally idle and carries traffic only sometimes, so
> I would want partitions from topic C to be evenly distributed so all
> partitions from topic C don't get assigned to only some of the threads.
> 
> Topic A - 8 partitions
> Topic B - 8 partitions
> Topic C - 8 partitions
> 
> On Fri, Sep 4, 2020 at 9:56 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> That is correct.
>>
>> If topicA has 5 partitions and topicB has 6 partitions, you get 5 tasks
>> for the first sub-topology and 6 tasks for the second sub-topology and
>> you can run up to 11 threads, each executing one task.
>>
>>
>> -Matthias
>>
>> On 9/4/20 1:30 AM, Pushkar Deole wrote:
>>> Matthias,
>>>
>>> Let's say we have independent sub topologies like: in this case, will the
>>> streams create tasks equal to the total number of partitions from topicA
>>> and topicB, and can we assign stream thread count that is sum of the
>>> partition of the two topics?
>>>
>>> builder.stream("topicA").filter().to();
>>> builder.stream("topicB").filter().to();
>>>
>>> On Thu, Sep 3, 2020 at 8:28 PM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>>> Well, it depends on your program.
>>>>
>>>> The reason for the current task creating strategy are joins: If you have
>>>> two input topic that you want to join, the join happens on a
>>>> per-partition basis, ie, topic-p0 is joined to topicB-p0 etc and thus
>>>> both partitions must be assigned to the same task (to get co-partitioned
>>>> data processed together).
>>>>
>>>> Note, that the following program would create independent tasks as it
>>>> consist of two independent sub-topologies:
>>>>
>>>> builder.stream("topicA").filter().to();
>>>> builder.stream("topicB").filter().to();
>>>>
>>>> However, the next program would be one sub-topology and thus we apply
>>>> the "join" rule (as we don't really know if you actually execute a join
>>>> or not when we create tasks):
>>>>
>>>> KStream s1 = builder.stream("topicA");
>>>> builser.stream("topicB").merge(s1).filter().to();
>>>>
>>>>
>>>> Having said that, I agree that it would be a nice improvement to be more
>>>> clever about it. However, it not easy to do. There is actually a related
>>>> ticket: https://issues.apache.org/jira/browse/KAFKA-9282
>>>>
>>>>
>>>> Hope this helps.
>>>>   -Matthias
>>>>
>>>> On 9/2/20 11:09 PM, Pushkar Deole wrote:
>>>>> Hi,
>>>>>
>>>>> I came across articles where it is explained how parallelism is handled
>>>> in
>>>>> kafka streams. This is what I collected:
>>>>> When the streams application is reading from multiple topics, the topic
>>>>> with maximum number of partitions is considered for instantiating
>> stream
>>>>> tasks so 1 task is instantiated per partition.
>>>>> Now, if the stream task is reading from multiple topics then the
>>>> partitions
>>>>> of multiple topics are shared among those stream tasks.
>>>>>
>>>>> For example, Topic A and B has 5 partitions each then 5 tasks are
>>>>> instantiated and assigned to 5 stream threads where each task is
>>>> assigned 1
>>>>> partition from Topic A and Topic B.
>>>>>
>>>>> The question here is : if I would want 1 task to be created for each
>>>>> partition from the input topic then is this possible? e.g. I would want
>>>> to
>>>>> have 5 tasks for topic A and 5 for B and then would want 10 threads to
>>>>> handle those. How can this be achieved?
>>>>>
>>>>
>>>>
>>>
>>
>>
>