You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sidhavratha Kumar (JIRA)" <ji...@apache.org> on 2018/08/25 11:54:00 UTC
[jira] [Created] (SPARK-25239) Spark Streaming for Kafka should
allow uniform batch size per partition for streaming RDD
Sidhavratha Kumar created SPARK-25239:
-----------------------------------------
Summary: Spark Streaming for Kafka should allow uniform batch size per partition for streaming RDD
Key: SPARK-25239
URL: https://issues.apache.org/jira/browse/SPARK-25239
Project: Spark
Issue Type: Improvement
Components: DStreams
Affects Versions: 2.2.0, 2.1.0, 2.4.0
Reporter: Sidhavratha Kumar
Current logic to determine maxMessagesPerPartition results in non-uniform message size per partition based on lag of each partition.
{code:java}
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
{code}
{code:java}
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
}
{code}
This will result in wastage of resource, since few cores which have less messages to process will wait till other cores are done with their task.
Let us consider topic t have 2 partitions
||Topic||Partition||Start Offset||End Offset||Current Offset||
|t|0|0|10000|0|
|t|1|0|100|0|
and maxRatePerPartition = 1000
and batch duration = 10 sec
As per calculation
maxMessage for part-0 -> (10000/10100) * 1000 / (batchDuration = 10) = 99
maxMessage for part-1 -> (100/10100) * 1000 / (batchDuration = 10) = 1
If application is running on 2 cores, one core will wait after processing 1 record of partition 1 till 99 records gets processed on other core for partition 0, before picking up next RDD.
If we enforce uniformity in batch size across partitions in each rdd, it will avoid wastage of resource.
In above case, we can put batch size for each partition = max(batch size of all partitions) i.e. 99.
maxMessage for part-0 = 99
maxMessage for part-1 = 99
So, we can process 98 more records of partition 1 in same time without wasting any resource.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org