You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sam Lendle <sl...@pandora.com> on 2018/06/28 19:47:26 UTC

Streams task assignment

Please correct me if I’m wrong, but I’m under the impression that the task_id in streams metrics is formatted as <topicGroupId>_<partitionId>, and topicGroupId corresponds to a particular subtopology in the streams topology. I assume that’s true for the rest of this message.

I have a streams app with multiple sub topologies. Two subtopologies are pretty simple but one is more complex with an aggregation, a join, and a transform. All input topics to subtopology 1 have 48 partitions, and I have 12 app instances running kafka streams with 4 threads each.  The distribution of tasks across threads and application instances is balanced. But when I look just at just the complex subtopology, I see that the number of partitions assigned to each thread varies between 0 and 2, and if I aggregate to the app level, it varies between 2 and 6.

Based on this, it looks to me that when assigning partitions to threads, the subtopology/topic group is not considered when trying to balance tasks assigned to threads. Also, it looks like kafkastreams instance may not be considered either, though that may just be an artifact of subtopology not being considered. Is that the case? Either way this is not ideal, because my complex sub topology is much more cpu and memory intensive.

So my question is: is it possible to take subtopology into account when tasks are assigned to threads/instances short of modifying StreamPartitionAssignor? I’m thinking I might need to split out that subtopology to a different streams app, but was wondering if there was a simpler way.

Best,
Sam

Re: Streams task assignment

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Sam,

Since 1.1.0 we do have considerations about load balancing between
sub-topologies, as summarized in this PR:
https://github.com/apache/kafka/pull/4624

Note that generally we want to optimize our task assignor further to be
state-store aware (https://issues.apache.org/jira/browse/KAFKA-6039); but
what you described as to consider at least at the basis of per-sub-topology
should already be added.



Guozhang



On Thu, Jun 28, 2018 at 12:47 PM Sam Lendle <sl...@pandora.com> wrote:

> Please correct me if I’m wrong, but I’m under the impression that the
> task_id in streams metrics is formatted as <topicGroupId>_<partitionId>,
> and topicGroupId corresponds to a particular subtopology in the streams
> topology. I assume that’s true for the rest of this message.
>
> I have a streams app with multiple sub topologies. Two subtopologies are
> pretty simple but one is more complex with an aggregation, a join, and a
> transform. All input topics to subtopology 1 have 48 partitions, and I have
> 12 app instances running kafka streams with 4 threads each.  The
> distribution of tasks across threads and application instances is balanced.
> But when I look just at just the complex subtopology, I see that the number
> of partitions assigned to each thread varies between 0 and 2, and if I
> aggregate to the app level, it varies between 2 and 6.
>
> Based on this, it looks to me that when assigning partitions to threads,
> the subtopology/topic group is not considered when trying to balance tasks
> assigned to threads. Also, it looks like kafkastreams instance may not be
> considered either, though that may just be an artifact of subtopology not
> being considered. Is that the case? Either way this is not ideal, because
> my complex sub topology is much more cpu and memory intensive.
>
> So my question is: is it possible to take subtopology into account when
> tasks are assigned to threads/instances short of modifying
> StreamPartitionAssignor? I’m thinking I might need to split out that
> subtopology to a different streams app, but was wondering if there was a
> simpler way.
>
> Best,
> Sam
>


-- 
-- Guozhang