You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Russell Teabeault <rt...@twitter.com.INVALID> on 2018/02/07 18:37:34 UTC

Kafka Streams balancing of tasks across nodes

We are using Kafka Streams for a project and had some questions about how
stream tasks are assigned.

streamBuilder
  .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
  ... // Do some stuff here

  .through("intermediate-topic")
  ... // Do some other stuff here

In this example we are streaming from "inbound-topic" and then doing some
work before writing the results back out to "intermediate-topic".
Then we are reading in from "intermediate-topic" and doing some more work.
If both of these topics contain 100 partitions (200 partitions total) and I
create 10 instances of my application then
what I observe is that there are a total of 20 partitions assigned to each
instance. But the distribution of these partitions across the two topics is
not even. For example, one
instance may have 7 partitions from "inbound-topic" and 13 partitions from
"intermediate-topic". I would have hoped that each instance would have 10
partitions from each
topic. Because of this uneven distribution it can make the resource
characteristics from instance to instance very different.

In a more concrete example we are reading from an input topic, then using
an in-memory store to do some filtering, followed by a groupBy, and finally
doing an aggregate.
This results in two topics; the input topic and then the internally created
intermediate topic written to by the groupBy and read from by the
aggregation. What we see is that some
instances are assigned far more partitions/tasks that are using the
in-memory store and some instances that have very few and sometimes no
tasks that use the in-memory store. This leads to wildly
different memory usage patterns across the instances. In turn this leads us
to set our memory much higher than needed if the partitions from each topic
were equally distributed across the instances.

The two ways we have figured out how to deal with this problem are:
1. Use a new StreamBuilder anytime an intermediate topic is being read from
in the application.
2. Break the topology into separate applications across the boundary of an
intermediate topic.

Neither of these seem like great solutions. So I would like to know:

1. Is this expected behavior?
2. Is there some technique to get equal distribution of task/partition
assignments across instances?

Thanks for the help.

--
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Re: Kafka Streams balancing of tasks across nodes

Posted by Russell Teabeault <rt...@twitter.com.INVALID>.
Matthias,

Disregard the exception I mentioned. I think that was a transient error
caused by our broker cluster re-spinning.

-russ

On Wed, Feb 7, 2018 at 3:45 PM, Russell Teabeault <rt...@twitter.com>
wrote:

> Hi Matthias,
>
> Thanks for the prompt reply. We have built the kafka-streams jar from the
> 1.1 branch and deployed our instances. We are only able to upgrade the
> Kafka Streams to 1.1
> and can not upgrade to 1.1 for the brokers. I don't think that should
> matter though. Yes?
>
> It does not seem to have helped. We currently have 25 instances with 4
> threads/instance. Our topology has two topics in it, each having 100
> partitions. The input topic feeds into a filtering step that uses an
> in-memory store and that is output via groupBy to an intermediate topic.
> The intermediate topic then feeds into an aggregation step which uses the
> rocksDB store. So we can see that we have 200 tasks total. After switching
> to 1.1 the task assignments are still wildly uneven. Some instances only
> have tasks from one of the topics. Furthermore, the instances keep dying
> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This server is not the leader for that topic-partition.
>
> Is there something else we need to do to make this updated task assignment
> work?
>
> Thanks!
> -russ
>
>
>
> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> It's a know issue and we addressed it already via
>> https://issues.apache.org/jira/browse/KAFKA-4969
>>
>> The fix will be part of upcoming 1.1 release, but you could try it out
>> immediately running from trunk or 1.0 branch. (If you do, feedback would
>> be very welcome :))
>>
>> Your proposed workarounds should work. I cannot come up with anything
>> else you could do, because the task assignment cannot be influenced.
>>
>>
>> -Matthias
>>
>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
>> > We are using Kafka Streams for a project and had some questions about
>> how
>> > stream tasks are assigned.
>> >
>> > streamBuilder
>> >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
>> >   ... // Do some stuff here
>> >
>> >   .through("intermediate-topic")
>> >   ... // Do some other stuff here
>> >
>> > In this example we are streaming from "inbound-topic" and then doing
>> some
>> > work before writing the results back out to "intermediate-topic".
>> > Then we are reading in from "intermediate-topic" and doing some more
>> work.
>> > If both of these topics contain 100 partitions (200 partitions total)
>> and I
>> > create 10 instances of my application then
>> > what I observe is that there are a total of 20 partitions assigned to
>> each
>> > instance. But the distribution of these partitions across the two
>> topics is
>> > not even. For example, one
>> > instance may have 7 partitions from "inbound-topic" and 13 partitions
>> from
>> > "intermediate-topic". I would have hoped that each instance would have
>> 10
>> > partitions from each
>> > topic. Because of this uneven distribution it can make the resource
>> > characteristics from instance to instance very different.
>> >
>> > In a more concrete example we are reading from an input topic, then
>> using
>> > an in-memory store to do some filtering, followed by a groupBy, and
>> finally
>> > doing an aggregate.
>> > This results in two topics; the input topic and then the internally
>> created
>> > intermediate topic written to by the groupBy and read from by the
>> > aggregation. What we see is that some
>> > instances are assigned far more partitions/tasks that are using the
>> > in-memory store and some instances that have very few and sometimes no
>> > tasks that use the in-memory store. This leads to wildly
>> > different memory usage patterns across the instances. In turn this
>> leads us
>> > to set our memory much higher than needed if the partitions from each
>> topic
>> > were equally distributed across the instances.
>> >
>> > The two ways we have figured out how to deal with this problem are:
>> > 1. Use a new StreamBuilder anytime an intermediate topic is being read
>> from
>> > in the application.
>> > 2. Break the topology into separate applications across the boundary of
>> an
>> > intermediate topic.
>> >
>> > Neither of these seem like great solutions. So I would like to know:
>> >
>> > 1. Is this expected behavior?
>> > 2. Is there some technique to get equal distribution of task/partition
>> > assignments across instances?
>> >
>> > Thanks for the help.
>> >
>> > --
>> > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>> >
>>
>>
>
>
> --
> --
> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>



-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Re: Kafka Streams balancing of tasks across nodes

Posted by Russell Teabeault <rt...@twitter.com.INVALID>.
Matthias,

Yes. We used the reset tool before deploying with 1.1.

-russ

On Wed, Feb 7, 2018 at 4:23 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Did you start the app from scratch, ie, wipe out all state before you
> restarted with 1.1? If not, reusing existing stores would overrule a
> more balanced deployment.
>
> You can set a new application.id or better use the reset tool to reset
> the application completely (maybe just calling KafkaStreams#cleanup();
> would do the trick, too for your case).
>
> And yes, upgrading Streams API to 1.1 is fine -- no need to upgrade the
> brokers.
>
>
> -Matthias
>
> On 2/7/18 3:16 PM, Russell Teabeault wrote:
> > Bill,
> >
> > I may be able to.
> >
> > - What logging level?
> > - Do you need logs from all the instances?
> > - Where should I send them?
> >
> > -russ
> >
> > On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <bi...@confluent.io> wrote:
> >
> >> Russell,
> >>
> >> Can you share any log files?
> >>
> >> Thanks,
> >> Bill
> >>
> >>
> >>
> >> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
> >> rteabeault@twitter.com.invalid> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> Thanks for the prompt reply. We have built the kafka-streams jar from
> the
> >>> 1.1 branch and deployed our instances. We are only able to upgrade the
> >>> Kafka Streams to 1.1
> >>> and can not upgrade to 1.1 for the brokers. I don't think that should
> >>> matter though. Yes?
> >>>
> >>> It does not seem to have helped. We currently have 25 instances with 4
> >>> threads/instance. Our topology has two topics in it, each having 100
> >>> partitions. The input topic feeds into a filtering step that uses an
> >>> in-memory store and that is output via groupBy to an intermediate
> topic.
> >>> The intermediate topic then feeds into an aggregation step which uses
> the
> >>> rocksDB store. So we can see that we have 200 tasks total. After
> >> switching
> >>> to 1.1 the task assignments are still wildly uneven. Some instances
> only
> >>> have tasks from one of the topics. Furthermore, the instances keep
> dying
> >>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> >> This
> >>> server is not the leader for that topic-partition.
> >>>
> >>> Is there something else we need to do to make this updated task
> >> assignment
> >>> work?
> >>>
> >>> Thanks!
> >>> -russ
> >>>
> >>>
> >>>
> >>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <
> matthias@confluent.io>
> >>> wrote:
> >>>
> >>>> It's a know issue and we addressed it already via
> >>>> https://issues.apache.org/jira/browse/KAFKA-4969
> >>>>
> >>>> The fix will be part of upcoming 1.1 release, but you could try it out
> >>>> immediately running from trunk or 1.0 branch. (If you do, feedback
> >> would
> >>>> be very welcome :))
> >>>>
> >>>> Your proposed workarounds should work. I cannot come up with anything
> >>>> else you could do, because the task assignment cannot be influenced.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
> >>>>> We are using Kafka Streams for a project and had some questions about
> >>> how
> >>>>> stream tasks are assigned.
> >>>>>
> >>>>> streamBuilder
> >>>>>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> >>>>>   ... // Do some stuff here
> >>>>>
> >>>>>   .through("intermediate-topic")
> >>>>>   ... // Do some other stuff here
> >>>>>
> >>>>> In this example we are streaming from "inbound-topic" and then doing
> >>> some
> >>>>> work before writing the results back out to "intermediate-topic".
> >>>>> Then we are reading in from "intermediate-topic" and doing some more
> >>>> work.
> >>>>> If both of these topics contain 100 partitions (200 partitions total)
> >>>> and I
> >>>>> create 10 instances of my application then
> >>>>> what I observe is that there are a total of 20 partitions assigned to
> >>>> each
> >>>>> instance. But the distribution of these partitions across the two
> >>> topics
> >>>> is
> >>>>> not even. For example, one
> >>>>> instance may have 7 partitions from "inbound-topic" and 13 partitions
> >>>> from
> >>>>> "intermediate-topic". I would have hoped that each instance would
> >> have
> >>> 10
> >>>>> partitions from each
> >>>>> topic. Because of this uneven distribution it can make the resource
> >>>>> characteristics from instance to instance very different.
> >>>>>
> >>>>> In a more concrete example we are reading from an input topic, then
> >>> using
> >>>>> an in-memory store to do some filtering, followed by a groupBy, and
> >>>> finally
> >>>>> doing an aggregate.
> >>>>> This results in two topics; the input topic and then the internally
> >>>> created
> >>>>> intermediate topic written to by the groupBy and read from by the
> >>>>> aggregation. What we see is that some
> >>>>> instances are assigned far more partitions/tasks that are using the
> >>>>> in-memory store and some instances that have very few and sometimes
> >> no
> >>>>> tasks that use the in-memory store. This leads to wildly
> >>>>> different memory usage patterns across the instances. In turn this
> >>> leads
> >>>> us
> >>>>> to set our memory much higher than needed if the partitions from each
> >>>> topic
> >>>>> were equally distributed across the instances.
> >>>>>
> >>>>> The two ways we have figured out how to deal with this problem are:
> >>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being
> >> read
> >>>> from
> >>>>> in the application.
> >>>>> 2. Break the topology into separate applications across the boundary
> >> of
> >>>> an
> >>>>> intermediate topic.
> >>>>>
> >>>>> Neither of these seem like great solutions. So I would like to know:
> >>>>>
> >>>>> 1. Is this expected behavior?
> >>>>> 2. Is there some technique to get equal distribution of
> >> task/partition
> >>>>> assignments across instances?
> >>>>>
> >>>>> Thanks for the help.
> >>>>>
> >>>>> --
> >>>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> --
> >>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >>>
> >>
> >
> >
> >
>
>


-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Re: Kafka Streams balancing of tasks across nodes

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Did you start the app from scratch, ie, wipe out all state before you
restarted with 1.1? If not, reusing existing stores would overrule a
more balanced deployment.

You can set a new application.id or better use the reset tool to reset
the application completely (maybe just calling KafkaStreams#cleanup();
would do the trick, too for your case).

And yes, upgrading Streams API to 1.1 is fine -- no need to upgrade the
brokers.


-Matthias

On 2/7/18 3:16 PM, Russell Teabeault wrote:
> Bill,
> 
> I may be able to.
> 
> - What logging level?
> - Do you need logs from all the instances?
> - Where should I send them?
> 
> -russ
> 
> On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <bi...@confluent.io> wrote:
> 
>> Russell,
>>
>> Can you share any log files?
>>
>> Thanks,
>> Bill
>>
>>
>>
>> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
>> rteabeault@twitter.com.invalid> wrote:
>>
>>> Hi Matthias,
>>>
>>> Thanks for the prompt reply. We have built the kafka-streams jar from the
>>> 1.1 branch and deployed our instances. We are only able to upgrade the
>>> Kafka Streams to 1.1
>>> and can not upgrade to 1.1 for the brokers. I don't think that should
>>> matter though. Yes?
>>>
>>> It does not seem to have helped. We currently have 25 instances with 4
>>> threads/instance. Our topology has two topics in it, each having 100
>>> partitions. The input topic feeds into a filtering step that uses an
>>> in-memory store and that is output via groupBy to an intermediate topic.
>>> The intermediate topic then feeds into an aggregation step which uses the
>>> rocksDB store. So we can see that we have 200 tasks total. After
>> switching
>>> to 1.1 the task assignments are still wildly uneven. Some instances only
>>> have tasks from one of the topics. Furthermore, the instances keep dying
>>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
>> This
>>> server is not the leader for that topic-partition.
>>>
>>> Is there something else we need to do to make this updated task
>> assignment
>>> work?
>>>
>>> Thanks!
>>> -russ
>>>
>>>
>>>
>>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> It's a know issue and we addressed it already via
>>>> https://issues.apache.org/jira/browse/KAFKA-4969
>>>>
>>>> The fix will be part of upcoming 1.1 release, but you could try it out
>>>> immediately running from trunk or 1.0 branch. (If you do, feedback
>> would
>>>> be very welcome :))
>>>>
>>>> Your proposed workarounds should work. I cannot come up with anything
>>>> else you could do, because the task assignment cannot be influenced.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
>>>>> We are using Kafka Streams for a project and had some questions about
>>> how
>>>>> stream tasks are assigned.
>>>>>
>>>>> streamBuilder
>>>>>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
>>>>>   ... // Do some stuff here
>>>>>
>>>>>   .through("intermediate-topic")
>>>>>   ... // Do some other stuff here
>>>>>
>>>>> In this example we are streaming from "inbound-topic" and then doing
>>> some
>>>>> work before writing the results back out to "intermediate-topic".
>>>>> Then we are reading in from "intermediate-topic" and doing some more
>>>> work.
>>>>> If both of these topics contain 100 partitions (200 partitions total)
>>>> and I
>>>>> create 10 instances of my application then
>>>>> what I observe is that there are a total of 20 partitions assigned to
>>>> each
>>>>> instance. But the distribution of these partitions across the two
>>> topics
>>>> is
>>>>> not even. For example, one
>>>>> instance may have 7 partitions from "inbound-topic" and 13 partitions
>>>> from
>>>>> "intermediate-topic". I would have hoped that each instance would
>> have
>>> 10
>>>>> partitions from each
>>>>> topic. Because of this uneven distribution it can make the resource
>>>>> characteristics from instance to instance very different.
>>>>>
>>>>> In a more concrete example we are reading from an input topic, then
>>> using
>>>>> an in-memory store to do some filtering, followed by a groupBy, and
>>>> finally
>>>>> doing an aggregate.
>>>>> This results in two topics; the input topic and then the internally
>>>> created
>>>>> intermediate topic written to by the groupBy and read from by the
>>>>> aggregation. What we see is that some
>>>>> instances are assigned far more partitions/tasks that are using the
>>>>> in-memory store and some instances that have very few and sometimes
>> no
>>>>> tasks that use the in-memory store. This leads to wildly
>>>>> different memory usage patterns across the instances. In turn this
>>> leads
>>>> us
>>>>> to set our memory much higher than needed if the partitions from each
>>>> topic
>>>>> were equally distributed across the instances.
>>>>>
>>>>> The two ways we have figured out how to deal with this problem are:
>>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being
>> read
>>>> from
>>>>> in the application.
>>>>> 2. Break the topology into separate applications across the boundary
>> of
>>>> an
>>>>> intermediate topic.
>>>>>
>>>>> Neither of these seem like great solutions. So I would like to know:
>>>>>
>>>>> 1. Is this expected behavior?
>>>>> 2. Is there some technique to get equal distribution of
>> task/partition
>>>>> assignments across instances?
>>>>>
>>>>> Thanks for the help.
>>>>>
>>>>> --
>>>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> --
>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>>>
>>
> 
> 
> 


Re: Kafka Streams balancing of tasks across nodes

Posted by Russell Teabeault <rt...@twitter.com.INVALID>.
Matthias,

The instances are transient (Mesos) so when we roll them we get a brand new
instance.

-russ

On Wed, Feb 7, 2018 at 4:53 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Russel,
>
> > Yes. We used the reset tool before deploying with 1.1.
>
> Did you also clean up local state? The tool only takes care of "broker
> side" cleanup. You would need to delete local state by calling
> KafkaStreams#cleanup() before restart or by deleting the corresponding
> local state directory manually.
>
>
> -Matthias
>
> On 2/7/18 3:38 PM, Bill Bejeck wrote:
> > Russell,
> >
> > INFO level is fine and it could be just the portion of the logs right
> after
> > streams has finished rebalancing.
> >
> > You can tar them up and attach to this mailing list unless you'd prefer
> not
> > to do so, in which case I can send you my email address directly.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Feb 7, 2018 at 6:16 PM, Russell Teabeault <
> > rteabeault@twitter.com.invalid> wrote:
> >
> >> Bill,
> >>
> >> I may be able to.
> >>
> >> - What logging level?
> >> - Do you need logs from all the instances?
> >> - Where should I send them?
> >>
> >> -russ
> >>
> >> On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <bi...@confluent.io> wrote:
> >>
> >>> Russell,
> >>>
> >>> Can you share any log files?
> >>>
> >>> Thanks,
> >>> Bill
> >>>
> >>>
> >>>
> >>> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
> >>> rteabeault@twitter.com.invalid> wrote:
> >>>
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for the prompt reply. We have built the kafka-streams jar from
> >> the
> >>>> 1.1 branch and deployed our instances. We are only able to upgrade the
> >>>> Kafka Streams to 1.1
> >>>> and can not upgrade to 1.1 for the brokers. I don't think that should
> >>>> matter though. Yes?
> >>>>
> >>>> It does not seem to have helped. We currently have 25 instances with 4
> >>>> threads/instance. Our topology has two topics in it, each having 100
> >>>> partitions. The input topic feeds into a filtering step that uses an
> >>>> in-memory store and that is output via groupBy to an intermediate
> >> topic.
> >>>> The intermediate topic then feeds into an aggregation step which uses
> >> the
> >>>> rocksDB store. So we can see that we have 200 tasks total. After
> >>> switching
> >>>> to 1.1 the task assignments are still wildly uneven. Some instances
> >> only
> >>>> have tasks from one of the topics. Furthermore, the instances keep
> >> dying
> >>>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> >>> This
> >>>> server is not the leader for that topic-partition.
> >>>>
> >>>> Is there something else we need to do to make this updated task
> >>> assignment
> >>>> work?
> >>>>
> >>>> Thanks!
> >>>> -russ
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> It's a know issue and we addressed it already via
> >>>>> https://issues.apache.org/jira/browse/KAFKA-4969
> >>>>>
> >>>>> The fix will be part of upcoming 1.1 release, but you could try it
> >> out
> >>>>> immediately running from trunk or 1.0 branch. (If you do, feedback
> >>> would
> >>>>> be very welcome :))
> >>>>>
> >>>>> Your proposed workarounds should work. I cannot come up with anything
> >>>>> else you could do, because the task assignment cannot be influenced.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
> >>>>>> We are using Kafka Streams for a project and had some questions
> >> about
> >>>> how
> >>>>>> stream tasks are assigned.
> >>>>>>
> >>>>>> streamBuilder
> >>>>>>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> >>>>>>   ... // Do some stuff here
> >>>>>>
> >>>>>>   .through("intermediate-topic")
> >>>>>>   ... // Do some other stuff here
> >>>>>>
> >>>>>> In this example we are streaming from "inbound-topic" and then
> >> doing
> >>>> some
> >>>>>> work before writing the results back out to "intermediate-topic".
> >>>>>> Then we are reading in from "intermediate-topic" and doing some
> >> more
> >>>>> work.
> >>>>>> If both of these topics contain 100 partitions (200 partitions
> >> total)
> >>>>> and I
> >>>>>> create 10 instances of my application then
> >>>>>> what I observe is that there are a total of 20 partitions assigned
> >> to
> >>>>> each
> >>>>>> instance. But the distribution of these partitions across the two
> >>>> topics
> >>>>> is
> >>>>>> not even. For example, one
> >>>>>> instance may have 7 partitions from "inbound-topic" and 13
> >> partitions
> >>>>> from
> >>>>>> "intermediate-topic". I would have hoped that each instance would
> >>> have
> >>>> 10
> >>>>>> partitions from each
> >>>>>> topic. Because of this uneven distribution it can make the resource
> >>>>>> characteristics from instance to instance very different.
> >>>>>>
> >>>>>> In a more concrete example we are reading from an input topic, then
> >>>> using
> >>>>>> an in-memory store to do some filtering, followed by a groupBy, and
> >>>>> finally
> >>>>>> doing an aggregate.
> >>>>>> This results in two topics; the input topic and then the internally
> >>>>> created
> >>>>>> intermediate topic written to by the groupBy and read from by the
> >>>>>> aggregation. What we see is that some
> >>>>>> instances are assigned far more partitions/tasks that are using the
> >>>>>> in-memory store and some instances that have very few and sometimes
> >>> no
> >>>>>> tasks that use the in-memory store. This leads to wildly
> >>>>>> different memory usage patterns across the instances. In turn this
> >>>> leads
> >>>>> us
> >>>>>> to set our memory much higher than needed if the partitions from
> >> each
> >>>>> topic
> >>>>>> were equally distributed across the instances.
> >>>>>>
> >>>>>> The two ways we have figured out how to deal with this problem are:
> >>>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being
> >>> read
> >>>>> from
> >>>>>> in the application.
> >>>>>> 2. Break the topology into separate applications across the
> >> boundary
> >>> of
> >>>>> an
> >>>>>> intermediate topic.
> >>>>>>
> >>>>>> Neither of these seem like great solutions. So I would like to
> >> know:
> >>>>>>
> >>>>>> 1. Is this expected behavior?
> >>>>>> 2. Is there some technique to get equal distribution of
> >>> task/partition
> >>>>>> assignments across instances?
> >>>>>>
> >>>>>> Thanks for the help.
> >>>>>>
> >>>>>> --
> >>>>>> Russell Teabeault | Senior Software Engineer | Twitter |
> >> @rusticules
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> --
> >>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> --
> >> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >>
> >
>
>


-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Re: Kafka Streams balancing of tasks across nodes

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Russel,

> Yes. We used the reset tool before deploying with 1.1.

Did you also clean up local state? The tool only takes care of "broker
side" cleanup. You would need to delete local state by calling
KafkaStreams#cleanup() before restart or by deleting the corresponding
local state directory manually.


-Matthias

On 2/7/18 3:38 PM, Bill Bejeck wrote:
> Russell,
> 
> INFO level is fine and it could be just the portion of the logs right after
> streams has finished rebalancing.
> 
> You can tar them up and attach to this mailing list unless you'd prefer not
> to do so, in which case I can send you my email address directly.
> 
> Thanks,
> Bill
> 
> On Wed, Feb 7, 2018 at 6:16 PM, Russell Teabeault <
> rteabeault@twitter.com.invalid> wrote:
> 
>> Bill,
>>
>> I may be able to.
>>
>> - What logging level?
>> - Do you need logs from all the instances?
>> - Where should I send them?
>>
>> -russ
>>
>> On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <bi...@confluent.io> wrote:
>>
>>> Russell,
>>>
>>> Can you share any log files?
>>>
>>> Thanks,
>>> Bill
>>>
>>>
>>>
>>> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
>>> rteabeault@twitter.com.invalid> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Thanks for the prompt reply. We have built the kafka-streams jar from
>> the
>>>> 1.1 branch and deployed our instances. We are only able to upgrade the
>>>> Kafka Streams to 1.1
>>>> and can not upgrade to 1.1 for the brokers. I don't think that should
>>>> matter though. Yes?
>>>>
>>>> It does not seem to have helped. We currently have 25 instances with 4
>>>> threads/instance. Our topology has two topics in it, each having 100
>>>> partitions. The input topic feeds into a filtering step that uses an
>>>> in-memory store and that is output via groupBy to an intermediate
>> topic.
>>>> The intermediate topic then feeds into an aggregation step which uses
>> the
>>>> rocksDB store. So we can see that we have 200 tasks total. After
>>> switching
>>>> to 1.1 the task assignments are still wildly uneven. Some instances
>> only
>>>> have tasks from one of the topics. Furthermore, the instances keep
>> dying
>>>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
>>> This
>>>> server is not the leader for that topic-partition.
>>>>
>>>> Is there something else we need to do to make this updated task
>>> assignment
>>>> work?
>>>>
>>>> Thanks!
>>>> -russ
>>>>
>>>>
>>>>
>>>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <
>> matthias@confluent.io>
>>>> wrote:
>>>>
>>>>> It's a know issue and we addressed it already via
>>>>> https://issues.apache.org/jira/browse/KAFKA-4969
>>>>>
>>>>> The fix will be part of upcoming 1.1 release, but you could try it
>> out
>>>>> immediately running from trunk or 1.0 branch. (If you do, feedback
>>> would
>>>>> be very welcome :))
>>>>>
>>>>> Your proposed workarounds should work. I cannot come up with anything
>>>>> else you could do, because the task assignment cannot be influenced.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
>>>>>> We are using Kafka Streams for a project and had some questions
>> about
>>>> how
>>>>>> stream tasks are assigned.
>>>>>>
>>>>>> streamBuilder
>>>>>>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
>>>>>>   ... // Do some stuff here
>>>>>>
>>>>>>   .through("intermediate-topic")
>>>>>>   ... // Do some other stuff here
>>>>>>
>>>>>> In this example we are streaming from "inbound-topic" and then
>> doing
>>>> some
>>>>>> work before writing the results back out to "intermediate-topic".
>>>>>> Then we are reading in from "intermediate-topic" and doing some
>> more
>>>>> work.
>>>>>> If both of these topics contain 100 partitions (200 partitions
>> total)
>>>>> and I
>>>>>> create 10 instances of my application then
>>>>>> what I observe is that there are a total of 20 partitions assigned
>> to
>>>>> each
>>>>>> instance. But the distribution of these partitions across the two
>>>> topics
>>>>> is
>>>>>> not even. For example, one
>>>>>> instance may have 7 partitions from "inbound-topic" and 13
>> partitions
>>>>> from
>>>>>> "intermediate-topic". I would have hoped that each instance would
>>> have
>>>> 10
>>>>>> partitions from each
>>>>>> topic. Because of this uneven distribution it can make the resource
>>>>>> characteristics from instance to instance very different.
>>>>>>
>>>>>> In a more concrete example we are reading from an input topic, then
>>>> using
>>>>>> an in-memory store to do some filtering, followed by a groupBy, and
>>>>> finally
>>>>>> doing an aggregate.
>>>>>> This results in two topics; the input topic and then the internally
>>>>> created
>>>>>> intermediate topic written to by the groupBy and read from by the
>>>>>> aggregation. What we see is that some
>>>>>> instances are assigned far more partitions/tasks that are using the
>>>>>> in-memory store and some instances that have very few and sometimes
>>> no
>>>>>> tasks that use the in-memory store. This leads to wildly
>>>>>> different memory usage patterns across the instances. In turn this
>>>> leads
>>>>> us
>>>>>> to set our memory much higher than needed if the partitions from
>> each
>>>>> topic
>>>>>> were equally distributed across the instances.
>>>>>>
>>>>>> The two ways we have figured out how to deal with this problem are:
>>>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being
>>> read
>>>>> from
>>>>>> in the application.
>>>>>> 2. Break the topology into separate applications across the
>> boundary
>>> of
>>>>> an
>>>>>> intermediate topic.
>>>>>>
>>>>>> Neither of these seem like great solutions. So I would like to
>> know:
>>>>>>
>>>>>> 1. Is this expected behavior?
>>>>>> 2. Is there some technique to get equal distribution of
>>> task/partition
>>>>>> assignments across instances?
>>>>>>
>>>>>> Thanks for the help.
>>>>>>
>>>>>> --
>>>>>> Russell Teabeault | Senior Software Engineer | Twitter |
>> @rusticules
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> --
>>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>>>>
>>>
>>
>>
>>
>> --
>> --
>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>>
> 


Re: Kafka Streams balancing of tasks across nodes

Posted by Bill Bejeck <bi...@confluent.io>.
Russell,

INFO level is fine and it could be just the portion of the logs right after
streams has finished rebalancing.

You can tar them up and attach to this mailing list unless you'd prefer not
to do so, in which case I can send you my email address directly.

Thanks,
Bill

On Wed, Feb 7, 2018 at 6:16 PM, Russell Teabeault <
rteabeault@twitter.com.invalid> wrote:

> Bill,
>
> I may be able to.
>
> - What logging level?
> - Do you need logs from all the instances?
> - Where should I send them?
>
> -russ
>
> On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <bi...@confluent.io> wrote:
>
> > Russell,
> >
> > Can you share any log files?
> >
> > Thanks,
> > Bill
> >
> >
> >
> > On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
> > rteabeault@twitter.com.invalid> wrote:
> >
> > > Hi Matthias,
> > >
> > > Thanks for the prompt reply. We have built the kafka-streams jar from
> the
> > > 1.1 branch and deployed our instances. We are only able to upgrade the
> > > Kafka Streams to 1.1
> > > and can not upgrade to 1.1 for the brokers. I don't think that should
> > > matter though. Yes?
> > >
> > > It does not seem to have helped. We currently have 25 instances with 4
> > > threads/instance. Our topology has two topics in it, each having 100
> > > partitions. The input topic feeds into a filtering step that uses an
> > > in-memory store and that is output via groupBy to an intermediate
> topic.
> > > The intermediate topic then feeds into an aggregation step which uses
> the
> > > rocksDB store. So we can see that we have 200 tasks total. After
> > switching
> > > to 1.1 the task assignments are still wildly uneven. Some instances
> only
> > > have tasks from one of the topics. Furthermore, the instances keep
> dying
> > > due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> > This
> > > server is not the leader for that topic-partition.
> > >
> > > Is there something else we need to do to make this updated task
> > assignment
> > > work?
> > >
> > > Thanks!
> > > -russ
> > >
> > >
> > >
> > > On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > It's a know issue and we addressed it already via
> > > > https://issues.apache.org/jira/browse/KAFKA-4969
> > > >
> > > > The fix will be part of upcoming 1.1 release, but you could try it
> out
> > > > immediately running from trunk or 1.0 branch. (If you do, feedback
> > would
> > > > be very welcome :))
> > > >
> > > > Your proposed workarounds should work. I cannot come up with anything
> > > > else you could do, because the task assignment cannot be influenced.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 2/7/18 10:37 AM, Russell Teabeault wrote:
> > > > > We are using Kafka Streams for a project and had some questions
> about
> > > how
> > > > > stream tasks are assigned.
> > > > >
> > > > > streamBuilder
> > > > >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> > > > >   ... // Do some stuff here
> > > > >
> > > > >   .through("intermediate-topic")
> > > > >   ... // Do some other stuff here
> > > > >
> > > > > In this example we are streaming from "inbound-topic" and then
> doing
> > > some
> > > > > work before writing the results back out to "intermediate-topic".
> > > > > Then we are reading in from "intermediate-topic" and doing some
> more
> > > > work.
> > > > > If both of these topics contain 100 partitions (200 partitions
> total)
> > > > and I
> > > > > create 10 instances of my application then
> > > > > what I observe is that there are a total of 20 partitions assigned
> to
> > > > each
> > > > > instance. But the distribution of these partitions across the two
> > > topics
> > > > is
> > > > > not even. For example, one
> > > > > instance may have 7 partitions from "inbound-topic" and 13
> partitions
> > > > from
> > > > > "intermediate-topic". I would have hoped that each instance would
> > have
> > > 10
> > > > > partitions from each
> > > > > topic. Because of this uneven distribution it can make the resource
> > > > > characteristics from instance to instance very different.
> > > > >
> > > > > In a more concrete example we are reading from an input topic, then
> > > using
> > > > > an in-memory store to do some filtering, followed by a groupBy, and
> > > > finally
> > > > > doing an aggregate.
> > > > > This results in two topics; the input topic and then the internally
> > > > created
> > > > > intermediate topic written to by the groupBy and read from by the
> > > > > aggregation. What we see is that some
> > > > > instances are assigned far more partitions/tasks that are using the
> > > > > in-memory store and some instances that have very few and sometimes
> > no
> > > > > tasks that use the in-memory store. This leads to wildly
> > > > > different memory usage patterns across the instances. In turn this
> > > leads
> > > > us
> > > > > to set our memory much higher than needed if the partitions from
> each
> > > > topic
> > > > > were equally distributed across the instances.
> > > > >
> > > > > The two ways we have figured out how to deal with this problem are:
> > > > > 1. Use a new StreamBuilder anytime an intermediate topic is being
> > read
> > > > from
> > > > > in the application.
> > > > > 2. Break the topology into separate applications across the
> boundary
> > of
> > > > an
> > > > > intermediate topic.
> > > > >
> > > > > Neither of these seem like great solutions. So I would like to
> know:
> > > > >
> > > > > 1. Is this expected behavior?
> > > > > 2. Is there some technique to get equal distribution of
> > task/partition
> > > > > assignments across instances?
> > > > >
> > > > > Thanks for the help.
> > > > >
> > > > > --
> > > > > Russell Teabeault | Senior Software Engineer | Twitter |
> @rusticules
> > > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > --
> > > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> > >
> >
>
>
>
> --
> --
> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>

Re: Kafka Streams balancing of tasks across nodes

Posted by Russell Teabeault <rt...@twitter.com.INVALID>.
Bill,

I may be able to.

- What logging level?
- Do you need logs from all the instances?
- Where should I send them?

-russ

On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <bi...@confluent.io> wrote:

> Russell,
>
> Can you share any log files?
>
> Thanks,
> Bill
>
>
>
> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
> rteabeault@twitter.com.invalid> wrote:
>
> > Hi Matthias,
> >
> > Thanks for the prompt reply. We have built the kafka-streams jar from the
> > 1.1 branch and deployed our instances. We are only able to upgrade the
> > Kafka Streams to 1.1
> > and can not upgrade to 1.1 for the brokers. I don't think that should
> > matter though. Yes?
> >
> > It does not seem to have helped. We currently have 25 instances with 4
> > threads/instance. Our topology has two topics in it, each having 100
> > partitions. The input topic feeds into a filtering step that uses an
> > in-memory store and that is output via groupBy to an intermediate topic.
> > The intermediate topic then feeds into an aggregation step which uses the
> > rocksDB store. So we can see that we have 200 tasks total. After
> switching
> > to 1.1 the task assignments are still wildly uneven. Some instances only
> > have tasks from one of the topics. Furthermore, the instances keep dying
> > due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This
> > server is not the leader for that topic-partition.
> >
> > Is there something else we need to do to make this updated task
> assignment
> > work?
> >
> > Thanks!
> > -russ
> >
> >
> >
> > On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > It's a know issue and we addressed it already via
> > > https://issues.apache.org/jira/browse/KAFKA-4969
> > >
> > > The fix will be part of upcoming 1.1 release, but you could try it out
> > > immediately running from trunk or 1.0 branch. (If you do, feedback
> would
> > > be very welcome :))
> > >
> > > Your proposed workarounds should work. I cannot come up with anything
> > > else you could do, because the task assignment cannot be influenced.
> > >
> > >
> > > -Matthias
> > >
> > > On 2/7/18 10:37 AM, Russell Teabeault wrote:
> > > > We are using Kafka Streams for a project and had some questions about
> > how
> > > > stream tasks are assigned.
> > > >
> > > > streamBuilder
> > > >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> > > >   ... // Do some stuff here
> > > >
> > > >   .through("intermediate-topic")
> > > >   ... // Do some other stuff here
> > > >
> > > > In this example we are streaming from "inbound-topic" and then doing
> > some
> > > > work before writing the results back out to "intermediate-topic".
> > > > Then we are reading in from "intermediate-topic" and doing some more
> > > work.
> > > > If both of these topics contain 100 partitions (200 partitions total)
> > > and I
> > > > create 10 instances of my application then
> > > > what I observe is that there are a total of 20 partitions assigned to
> > > each
> > > > instance. But the distribution of these partitions across the two
> > topics
> > > is
> > > > not even. For example, one
> > > > instance may have 7 partitions from "inbound-topic" and 13 partitions
> > > from
> > > > "intermediate-topic". I would have hoped that each instance would
> have
> > 10
> > > > partitions from each
> > > > topic. Because of this uneven distribution it can make the resource
> > > > characteristics from instance to instance very different.
> > > >
> > > > In a more concrete example we are reading from an input topic, then
> > using
> > > > an in-memory store to do some filtering, followed by a groupBy, and
> > > finally
> > > > doing an aggregate.
> > > > This results in two topics; the input topic and then the internally
> > > created
> > > > intermediate topic written to by the groupBy and read from by the
> > > > aggregation. What we see is that some
> > > > instances are assigned far more partitions/tasks that are using the
> > > > in-memory store and some instances that have very few and sometimes
> no
> > > > tasks that use the in-memory store. This leads to wildly
> > > > different memory usage patterns across the instances. In turn this
> > leads
> > > us
> > > > to set our memory much higher than needed if the partitions from each
> > > topic
> > > > were equally distributed across the instances.
> > > >
> > > > The two ways we have figured out how to deal with this problem are:
> > > > 1. Use a new StreamBuilder anytime an intermediate topic is being
> read
> > > from
> > > > in the application.
> > > > 2. Break the topology into separate applications across the boundary
> of
> > > an
> > > > intermediate topic.
> > > >
> > > > Neither of these seem like great solutions. So I would like to know:
> > > >
> > > > 1. Is this expected behavior?
> > > > 2. Is there some technique to get equal distribution of
> task/partition
> > > > assignments across instances?
> > > >
> > > > Thanks for the help.
> > > >
> > > > --
> > > > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> > > >
> > >
> > >
> >
> >
> > --
> > --
> > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >
>



-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Re: Kafka Streams balancing of tasks across nodes

Posted by Bill Bejeck <bi...@confluent.io>.
Russell,

Can you share any log files?

Thanks,
Bill



On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
rteabeault@twitter.com.invalid> wrote:

> Hi Matthias,
>
> Thanks for the prompt reply. We have built the kafka-streams jar from the
> 1.1 branch and deployed our instances. We are only able to upgrade the
> Kafka Streams to 1.1
> and can not upgrade to 1.1 for the brokers. I don't think that should
> matter though. Yes?
>
> It does not seem to have helped. We currently have 25 instances with 4
> threads/instance. Our topology has two topics in it, each having 100
> partitions. The input topic feeds into a filtering step that uses an
> in-memory store and that is output via groupBy to an intermediate topic.
> The intermediate topic then feeds into an aggregation step which uses the
> rocksDB store. So we can see that we have 200 tasks total. After switching
> to 1.1 the task assignments are still wildly uneven. Some instances only
> have tasks from one of the topics. Furthermore, the instances keep dying
> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
>
> Is there something else we need to do to make this updated task assignment
> work?
>
> Thanks!
> -russ
>
>
>
> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > It's a know issue and we addressed it already via
> > https://issues.apache.org/jira/browse/KAFKA-4969
> >
> > The fix will be part of upcoming 1.1 release, but you could try it out
> > immediately running from trunk or 1.0 branch. (If you do, feedback would
> > be very welcome :))
> >
> > Your proposed workarounds should work. I cannot come up with anything
> > else you could do, because the task assignment cannot be influenced.
> >
> >
> > -Matthias
> >
> > On 2/7/18 10:37 AM, Russell Teabeault wrote:
> > > We are using Kafka Streams for a project and had some questions about
> how
> > > stream tasks are assigned.
> > >
> > > streamBuilder
> > >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> > >   ... // Do some stuff here
> > >
> > >   .through("intermediate-topic")
> > >   ... // Do some other stuff here
> > >
> > > In this example we are streaming from "inbound-topic" and then doing
> some
> > > work before writing the results back out to "intermediate-topic".
> > > Then we are reading in from "intermediate-topic" and doing some more
> > work.
> > > If both of these topics contain 100 partitions (200 partitions total)
> > and I
> > > create 10 instances of my application then
> > > what I observe is that there are a total of 20 partitions assigned to
> > each
> > > instance. But the distribution of these partitions across the two
> topics
> > is
> > > not even. For example, one
> > > instance may have 7 partitions from "inbound-topic" and 13 partitions
> > from
> > > "intermediate-topic". I would have hoped that each instance would have
> 10
> > > partitions from each
> > > topic. Because of this uneven distribution it can make the resource
> > > characteristics from instance to instance very different.
> > >
> > > In a more concrete example we are reading from an input topic, then
> using
> > > an in-memory store to do some filtering, followed by a groupBy, and
> > finally
> > > doing an aggregate.
> > > This results in two topics; the input topic and then the internally
> > created
> > > intermediate topic written to by the groupBy and read from by the
> > > aggregation. What we see is that some
> > > instances are assigned far more partitions/tasks that are using the
> > > in-memory store and some instances that have very few and sometimes no
> > > tasks that use the in-memory store. This leads to wildly
> > > different memory usage patterns across the instances. In turn this
> leads
> > us
> > > to set our memory much higher than needed if the partitions from each
> > topic
> > > were equally distributed across the instances.
> > >
> > > The two ways we have figured out how to deal with this problem are:
> > > 1. Use a new StreamBuilder anytime an intermediate topic is being read
> > from
> > > in the application.
> > > 2. Break the topology into separate applications across the boundary of
> > an
> > > intermediate topic.
> > >
> > > Neither of these seem like great solutions. So I would like to know:
> > >
> > > 1. Is this expected behavior?
> > > 2. Is there some technique to get equal distribution of task/partition
> > > assignments across instances?
> > >
> > > Thanks for the help.
> > >
> > > --
> > > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> > >
> >
> >
>
>
> --
> --
> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>

Re: Kafka Streams balancing of tasks across nodes

Posted by Russell Teabeault <rt...@twitter.com.INVALID>.
Hi Matthias,

Thanks for the prompt reply. We have built the kafka-streams jar from the
1.1 branch and deployed our instances. We are only able to upgrade the
Kafka Streams to 1.1
and can not upgrade to 1.1 for the brokers. I don't think that should
matter though. Yes?

It does not seem to have helped. We currently have 25 instances with 4
threads/instance. Our topology has two topics in it, each having 100
partitions. The input topic feeds into a filtering step that uses an
in-memory store and that is output via groupBy to an intermediate topic.
The intermediate topic then feeds into an aggregation step which uses the
rocksDB store. So we can see that we have 200 tasks total. After switching
to 1.1 the task assignments are still wildly uneven. Some instances only
have tasks from one of the topics. Furthermore, the instances keep dying
due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This
server is not the leader for that topic-partition.

Is there something else we need to do to make this updated task assignment
work?

Thanks!
-russ



On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> It's a know issue and we addressed it already via
> https://issues.apache.org/jira/browse/KAFKA-4969
>
> The fix will be part of upcoming 1.1 release, but you could try it out
> immediately running from trunk or 1.0 branch. (If you do, feedback would
> be very welcome :))
>
> Your proposed workarounds should work. I cannot come up with anything
> else you could do, because the task assignment cannot be influenced.
>
>
> -Matthias
>
> On 2/7/18 10:37 AM, Russell Teabeault wrote:
> > We are using Kafka Streams for a project and had some questions about how
> > stream tasks are assigned.
> >
> > streamBuilder
> >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> >   ... // Do some stuff here
> >
> >   .through("intermediate-topic")
> >   ... // Do some other stuff here
> >
> > In this example we are streaming from "inbound-topic" and then doing some
> > work before writing the results back out to "intermediate-topic".
> > Then we are reading in from "intermediate-topic" and doing some more
> work.
> > If both of these topics contain 100 partitions (200 partitions total)
> and I
> > create 10 instances of my application then
> > what I observe is that there are a total of 20 partitions assigned to
> each
> > instance. But the distribution of these partitions across the two topics
> is
> > not even. For example, one
> > instance may have 7 partitions from "inbound-topic" and 13 partitions
> from
> > "intermediate-topic". I would have hoped that each instance would have 10
> > partitions from each
> > topic. Because of this uneven distribution it can make the resource
> > characteristics from instance to instance very different.
> >
> > In a more concrete example we are reading from an input topic, then using
> > an in-memory store to do some filtering, followed by a groupBy, and
> finally
> > doing an aggregate.
> > This results in two topics; the input topic and then the internally
> created
> > intermediate topic written to by the groupBy and read from by the
> > aggregation. What we see is that some
> > instances are assigned far more partitions/tasks that are using the
> > in-memory store and some instances that have very few and sometimes no
> > tasks that use the in-memory store. This leads to wildly
> > different memory usage patterns across the instances. In turn this leads
> us
> > to set our memory much higher than needed if the partitions from each
> topic
> > were equally distributed across the instances.
> >
> > The two ways we have figured out how to deal with this problem are:
> > 1. Use a new StreamBuilder anytime an intermediate topic is being read
> from
> > in the application.
> > 2. Break the topology into separate applications across the boundary of
> an
> > intermediate topic.
> >
> > Neither of these seem like great solutions. So I would like to know:
> >
> > 1. Is this expected behavior?
> > 2. Is there some technique to get equal distribution of task/partition
> > assignments across instances?
> >
> > Thanks for the help.
> >
> > --
> > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >
>
>


-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Re: Kafka Streams balancing of tasks across nodes

Posted by "Matthias J. Sax" <ma...@confluent.io>.
It's a know issue and we addressed it already via
https://issues.apache.org/jira/browse/KAFKA-4969

The fix will be part of upcoming 1.1 release, but you could try it out
immediately running from trunk or 1.0 branch. (If you do, feedback would
be very welcome :))

Your proposed workarounds should work. I cannot come up with anything
else you could do, because the task assignment cannot be influenced.


-Matthias

On 2/7/18 10:37 AM, Russell Teabeault wrote:
> We are using Kafka Streams for a project and had some questions about how
> stream tasks are assigned.
> 
> streamBuilder
>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
>   ... // Do some stuff here
> 
>   .through("intermediate-topic")
>   ... // Do some other stuff here
> 
> In this example we are streaming from "inbound-topic" and then doing some
> work before writing the results back out to "intermediate-topic".
> Then we are reading in from "intermediate-topic" and doing some more work.
> If both of these topics contain 100 partitions (200 partitions total) and I
> create 10 instances of my application then
> what I observe is that there are a total of 20 partitions assigned to each
> instance. But the distribution of these partitions across the two topics is
> not even. For example, one
> instance may have 7 partitions from "inbound-topic" and 13 partitions from
> "intermediate-topic". I would have hoped that each instance would have 10
> partitions from each
> topic. Because of this uneven distribution it can make the resource
> characteristics from instance to instance very different.
> 
> In a more concrete example we are reading from an input topic, then using
> an in-memory store to do some filtering, followed by a groupBy, and finally
> doing an aggregate.
> This results in two topics; the input topic and then the internally created
> intermediate topic written to by the groupBy and read from by the
> aggregation. What we see is that some
> instances are assigned far more partitions/tasks that are using the
> in-memory store and some instances that have very few and sometimes no
> tasks that use the in-memory store. This leads to wildly
> different memory usage patterns across the instances. In turn this leads us
> to set our memory much higher than needed if the partitions from each topic
> were equally distributed across the instances.
> 
> The two ways we have figured out how to deal with this problem are:
> 1. Use a new StreamBuilder anytime an intermediate topic is being read from
> in the application.
> 2. Break the topology into separate applications across the boundary of an
> intermediate topic.
> 
> Neither of these seem like great solutions. So I would like to know:
> 
> 1. Is this expected behavior?
> 2. Is there some technique to get equal distribution of task/partition
> assignments across instances?
> 
> Thanks for the help.
> 
> --
> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>