You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2018/07/11 19:02:00 UTC

[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

    [ https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16540536#comment-16540536 ] 

Guozhang Wang commented on KAFKA-7149:
--------------------------------------

[~asurana] Thanks for providing a solution to this issue.

I agree with you that for #3 would not work, but scenario #2 should still work. In fact, in KIP-268 we've improved the protocol such that :

1) if the encoded version is larger than decoder latest version, the leader will send back a "version probing" flag back to ask the encoder to choose a lower version re-encode the data again so that it can decode.

2) if the encoded version is smaller than the decoder version, the leader should be able to decode the data using the lower versioned decoding mechanism.


On the higher-level, I think besides using compression to reduce assignment size (note it is still trading more CPU for metadata size), there are some other approaches we can consider to reduce the size. Currently our assignment info (as of latest version 3) is encoded as

{code}
num.active-task-ids, [active-task-ids]
num.standby-task-ids, [standby-task-ids]
num.partitions-by-host, [partitions-by-host]
{code}

where {{partitions-by-host}} is formatted as:
{code}
host, port, num.partitions, [partitions]
{code}

I think the most largest chunk of information is {{partitions-by-host}} list.

We can, instead, reformat the last piece of information as

{code}
num.tasks-by-host, [tasks-by-host]
{code}

where {{tasks-by-host}} is formatted as:
{code}
host, port, num.tasks, [task-ids]
{code}

The observation is that, everyone can infer the assigned partitions from the task-ids information, so we can just send assigned task-ids by host. WDYT?

> Reduce assignment data size to improve kafka streams scalability
> ----------------------------------------------------------------
>
>                 Key: KAFKA-7149
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7149
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Ashish Surana
>            Assignee: Ashish Surana
>            Priority: Major
>
> We observed that when we have high number of partitions, instances or stream-threads, assignment-data size grows too fast and we start getting below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of assignment data for each rebalancing affects performance & reliability (timeout exceptions starts appearing) as well. Also this limits kafka streams scale even with high max.message.bytes setting as data size increases pretty quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed assignment-data. We saw assignment-data size reduced by 8X-10X. This improved the kafka streams scalability drastically for us and we could now run it with more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)