You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/06/02 06:54:59 UTC

[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

    [ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15311836#comment-15311836 ] 

ASF GitHub Bot commented on KAFKA-3775:
---------------------------------------

GitHub user kawamuray opened a pull request:

    https://github.com/apache/kafka/pull/1460

    KAFKA-3775: Throttle maximum number of tasks assigned to a single KafkaStreams

    Issue: https://issues.apache.org/jira/browse/KAFKA-3775
    
    POC. Discussion in progress.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kawamuray/kafka KAFKA-3775-throttle-tasks

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/1460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1460
    
----
commit fefe259b2c97bb1bbf14b572533ca74348651c0d
Author: Yuto Kawamura <ka...@gmail.com>
Date:   2016-06-02T03:46:51Z

    MINOR: Add toString() to ClientState for debugging

commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5
Author: Yuto Kawamura <ka...@gmail.com>
Date:   2016-06-02T03:51:34Z

    MINOR: Remove meanglessly repeated assertions in unit test

commit 3c173fa5d029277e5d1974c104d7e66939b5cd17
Author: Yuto Kawamura <ka...@gmail.com>
Date:   2016-06-02T03:55:10Z

    KAFKA-3775: Intorduce new streams configuration max.tasks.assigned
    
    This configuration limits the maximum number of tasks assigned to a single KafkaStreams instance.
    As a task consists of single partition for more than 1 topic, setting this value to lower is useful
    to prevent huge number of partitions are assigned to an instance which started first.

----


> Throttle maximum number of tasks assigned to a single KafkaStreams
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3775
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>             Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which consists of single KafkaStreams instance, that instance gets all partitions of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and message traffic, it is a problem that we don't have a way of throttling the maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has more than 10MB/sec traffic of each partition we saw that all partitions assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution framework, there's no predefined procedure of starting Kafka Streams apps so some users might wanna take an option to start the first single instance and check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that users don't have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams here). Users should be allowed to provide the maximum amount of partitions that is considered as possible to be processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter {{max.tasks.assigned}}, which limits the number of tasks(a notion of partition) assigned to the processId(which is the notion of single KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to tolerate the incomplete assignment. That is, Kafka Streams should continue working for the part of partitions even there are some partitions left unassigned, in order to satisfy this> "user may want to take an option to start the first single instance and check if it works as expected with lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)