You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sidhavratha Kumar (JIRA)" <ji...@apache.org> on 2018/08/25 11:54:00 UTC

[jira] [Created] (SPARK-25239) Spark Streaming for Kafka should allow uniform batch size per partition for streaming RDD

Sidhavratha Kumar created SPARK-25239:
-----------------------------------------

             Summary: Spark Streaming for Kafka should allow uniform batch size per partition for streaming RDD
                 Key: SPARK-25239
                 URL: https://issues.apache.org/jira/browse/SPARK-25239
             Project: Spark
          Issue Type: Improvement
          Components: DStreams
    Affects Versions: 2.2.0, 2.1.0, 2.4.0
            Reporter: Sidhavratha Kumar


 
 
Current logic to determine maxMessagesPerPartition results in non-uniform message size per partition based on lag of each partition.

 
{code:java}
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
{code}
{code:java}
 if (effectiveRateLimitPerPartition.values.sum > 0) { 
    val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 
    Some(effectiveRateLimitPerPartition.map { 
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong 
    }) 
}
{code}
 

 

 

This will result in wastage of resource, since few cores which have less messages to process will wait till other cores are done with their task.

Let us consider topic t have 2 partitions

 

 
||Topic||Partition||Start Offset||End Offset||Current Offset||
|t|0|0|10000|0|
|t|1|0|100|0|

and maxRatePerPartition = 1000

and batch duration = 10 sec

As per calculation

maxMessage for part-0 -> (10000/10100) * 1000 / (batchDuration = 10) = 99
 maxMessage for part-1 -> (100/10100) * 1000 / (batchDuration = 10) = 1

If application is running on 2 cores, one core will wait after processing 1 record of partition 1 till 99 records gets processed on other core for partition 0, before picking up next RDD.

If we enforce uniformity in batch size across partitions in each rdd, it will avoid wastage of resource.
 In above case, we can put batch size for each partition = max(batch size of all partitions) i.e. 99.

maxMessage for part-0 = 99
 maxMessage for part-1 = 99

So, we can process 98 more records of partition 1 in same time without wasting any resource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org