You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Vinoth Chandar (Jira)" <ji...@apache.org> on 2023/04/04 16:16:00 UTC

[jira] [Assigned] (HUDI-6019) Kafka source support split by count

     [ https://issues.apache.org/jira/browse/HUDI-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vinoth Chandar reassigned HUDI-6019:
------------------------------------

    Assignee: Vinoth Chandar

> Kafka source support split by count
> -----------------------------------
>
>                 Key: HUDI-6019
>                 URL: https://issues.apache.org/jira/browse/HUDI-6019
>             Project: Apache Hudi
>          Issue Type: New Feature
>          Components: deltastreamer, hudi-utilities
>            Reporter: Kong Wei
>            Assignee: Vinoth Chandar
>            Priority: Major
>              Labels: pull-request-available
>
> For the kafka source, when pulling data from kafka, the default parallelism is the number of kafka partitions, and the only way to increase the parallelism (to speed up) is to add more kafka partitions.
> There are cases:
>  # Pulling large amount of data from kafka (eg. maxEvents=100000000), but the # of kafka partition is not enough, the procedure of the pulling will cost too much of time, even worse can cause the executor OOM
>  # There is huge data skew between kafka partitions, the procedure of the pulling will be blocked by the slowest partition
> to solve those cases, I want to add a parameter {{*hoodie.deltastreamer.source.kafka.per.partition.maxEvents*}} to control the maxEvents in one kafka partition, default Long.MAX_VALUE means not trun this feature on.
>  
> For example, given hoodie.deltastreamer.kafka.source.maxEvents=10000000, 2 kafka partitions:
> the best case is pulling 5000000 events from each kafka partition, which may take minutes to finish;
> while worse case may be pulling 9000000 event from one partition, and pulling 1000000 events from another one, which will take more time to finish due to data skew.
>  
> In this example, we set {{hoodie.deltastreamer.source.kafka.per.partition.maxEvents=1000000, then we will split the kafka source into at least 10 parts, each executor will pulling at most 1000000 events from kafka, which will take the advantage of parallelism.}}
> {{}}
> {{}}
> {{**}}
> 3 benefits of this feature:
>  # Avoid a single executor pulling a large amount of data and taking too long ({*}avoid data skew{*})
>  # Avoid a single executor pulling a large amount of data, use too much memory or even OOM ({*}avoid OOM{*})
>  # A single executor pulls a small amount of data, which can make full use of the number of cores to improve concurrency, then reduce the time of the pulling procedure ({*}increase parallelism{*})
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)