You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2019/07/16 16:42:14 UTC

[jira] [Updated] (SPARK-25239) Spark Streaming for Kafka should allow uniform batch size per partition for streaming RDD

     [ https://issues.apache.org/jira/browse/SPARK-25239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-25239:
----------------------------------
    Affects Version/s:     (was: 2.4.0)
                           (was: 2.2.0)
                           (was: 2.1.0)
                       3.0.0

> Spark Streaming for Kafka should allow uniform batch size per partition for streaming RDD
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-25239
>                 URL: https://issues.apache.org/jira/browse/SPARK-25239
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 3.0.0
>            Reporter: Sidhavratha Kumar
>            Priority: Minor
>
>  
>  
> Current logic to determine maxMessagesPerPartition results in non-uniform message size per partition based on lag of each partition.
>  
> {code:java}
> val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
> {code}
> {code:java}
>  if (effectiveRateLimitPerPartition.values.sum > 0) { 
>     val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 
>     Some(effectiveRateLimitPerPartition.map { 
>         case (tp, limit) => tp -> (secsPerBatch * limit).toLong 
>     }) 
> }
> {code}
>  
>  
>  
> This will result in wastage of resource, since few cores which have less messages to process will wait till other cores are done with their task.
> Let us consider topic t have 2 partitions
>  
>  
> ||Topic||Partition||Start Offset||End Offset||Current Offset||
> |t|0|0|10000|0|
> |t|1|0|100|0|
> and maxRatePerPartition = 1000
> and batch duration = 10 sec
> As per calculation
> maxMessage for part-0 -> (10000/10100) * 1000 / (batchDuration = 10) = 99
>  maxMessage for part-1 -> (100/10100) * 1000 / (batchDuration = 10) = 1
> If application is running on 2 cores, one core will wait after processing 1 record of partition 1 till 99 records gets processed on other core for partition 0, before picking up next RDD.
> If we enforce uniformity in batch size across partitions in each rdd, it will avoid wastage of resource.
>  In above case, we can put batch size for each partition = max(batch size of all partitions) i.e. 99.
> maxMessage for part-0 = 99
>  maxMessage for part-1 = 99
> So, we can process 98 more records of partition 1 in same time without wasting any resource.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org