You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2020/03/16 22:54:10 UTC
[jira] [Updated] (SPARK-25239) Spark Streaming for Kafka should
allow uniform batch size per partition for streaming RDD
[ https://issues.apache.org/jira/browse/SPARK-25239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-25239:
----------------------------------
Affects Version/s: (was: 3.0.0)
3.1.0
> Spark Streaming for Kafka should allow uniform batch size per partition for streaming RDD
> -----------------------------------------------------------------------------------------
>
> Key: SPARK-25239
> URL: https://issues.apache.org/jira/browse/SPARK-25239
> Project: Spark
> Issue Type: Improvement
> Components: DStreams
> Affects Versions: 3.1.0
> Reporter: Sidhavratha Kumar
> Priority: Minor
>
>
>
> Current logic to determine maxMessagesPerPartition results in non-uniform message size per partition based on lag of each partition.
>
> {code:java}
> val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
> {code}
> {code:java}
> if (effectiveRateLimitPerPartition.values.sum > 0) {
> val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
> Some(effectiveRateLimitPerPartition.map {
> case (tp, limit) => tp -> (secsPerBatch * limit).toLong
> })
> }
> {code}
>
>
>
> This will result in wastage of resource, since few cores which have less messages to process will wait till other cores are done with their task.
> Let us consider topic t have 2 partitions
>
>
> ||Topic||Partition||Start Offset||End Offset||Current Offset||
> |t|0|0|10000|0|
> |t|1|0|100|0|
> and maxRatePerPartition = 1000
> and batch duration = 10 sec
> As per calculation
> maxMessage for part-0 -> (10000/10100) * 1000 / (batchDuration = 10) = 99
> maxMessage for part-1 -> (100/10100) * 1000 / (batchDuration = 10) = 1
> If application is running on 2 cores, one core will wait after processing 1 record of partition 1 till 99 records gets processed on other core for partition 0, before picking up next RDD.
> If we enforce uniformity in batch size across partitions in each rdd, it will avoid wastage of resource.
> In above case, we can put batch size for each partition = max(batch size of all partitions) i.e. 99.
> maxMessage for part-0 = 99
> maxMessage for part-1 = 99
> So, we can process 98 more records of partition 1 in same time without wasting any resource.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org