You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Samy Dindane <sa...@dindane.com> on 2016/10/11 15:57:29 UTC

Limit Kafka batches size with Spark Streaming

Hi,

Is it possible to limit the size of the batches returned by the Kafka consumer for Spark Streaming?
I am asking because the first batch I get has hundred of millions of records and it takes ages to process and checkpoint them.

Thank you.

Samy

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Limit Kafka batches size with Spark Streaming

Posted by Samy Dindane <sa...@dindane.com>.
Hi,

On 10/13/2016 04:35 PM, Cody Koeninger wrote:
> So I see in the logs that PIDRateEstimator is choosing a new rate, and
> the rate it's choosing is 100.
But it's always choosing 100, while all the other variables change (processing time, latestRate, etc.) change.
Also, the records per batch is always the same despite the rate being 100.
>
> That happens to be the default minimum of an (apparently undocumented) setting,
>
> spark.streaming.backpressure.pid.minRate
>
> Try setting that to 1 and see if there's different behavior.
Same behavior. Always choose the same rate, the records per batch number does not change.
>
> BTW, how many kafka partitions are you using, and how many actually
> have data for a given batch?
3 partitions.
All of them have more than maxRatePerPartition records (my topic has hundred of millions of records).
>
>
> On Thu, Oct 13, 2016 at 4:33 AM, Samy Dindane <sa...@dindane.com> wrote:
>> Hey Cody,
>>
>> Thanks for the reply. Really helpful.
>>
>> Following your suggestion, I set spark.streaming.backpressure.enabled to
>> true and maxRatePerPartition to 100000.
>> I know I can handle 100k records at the same time, but definitely not in 1
>> second (the batchDuration), so I expect the backpressure to lower that
>> number.
>>
>> Unfortunately the backpressure doesn't work and I keep getting 100k records
>> per batch.
>>
>> Here is my output log:
>> https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba
>> And this is my conf:
>>
>>     conf.set("spark.streaming.kafka.consumer.poll.ms", "30000")
>>     conf.set("spark.streaming.kafka.maxRatePerPartition", "100000")
>>     conf.set("spark.streaming.backpressure.enabled", "true")
>>
>> That's not normal, is it? Do you notice anything odd in my logs?
>>
>> Thanks a lot.
>>
>>
>>
>> On 10/12/2016 07:31 PM, Cody Koeninger wrote:
>>>
>>> Cool, just wanted to make sure.
>>>
>>> To answer your question about
>>>
>>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>>
>>>
>>> that configuration was added well after the integration of the direct
>>> stream with the backpressure code, and was added only to the receiver
>>> code, which the direct stream doesn't share since it isn't a receiver.
>>> Not making excuses about it being confusing, just explaining how
>>> things ended up that way :(  So yeah, maxRatePerPartition is the
>>> closest thing you have on the direct stream side to being able to
>>> limit before the backpressure estimator has something to work with.
>>>
>>> So to try and debug what you're seeing, if you add a line like this to
>>> your log4j.properties
>>>
>>> log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE
>>>
>>> you should start seeing log lines like
>>>
>>> 16/10/12 12:18:01 TRACE PIDRateEstimator:
>>> time = 1476292681092, # records = 20, processing time = 20949,
>>> scheduling delay = 6
>>> 16/10/12 12:18:01 TRACE PIDRateEstimator:
>>> latestRate = -1.0, error = -1.9546995083297531
>>> latestError = -1.0, historicalError = 0.001145639409995704
>>> delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10
>>>
>>> and then once it updates, lines like
>>>
>>> 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0
>>>
>>> For a really artificially constrained example where
>>> maxRatePerPartition is set such that it limits to 20 per batch but the
>>> system can really only handle 5 per batch, the streaming UI will look
>>> something like this:
>>>
>>> https://i.imgsafe.org/e730492453.png
>>>
>>> notice the cutover point
>>>
>>>
>>> On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>>
>>>> I am 100% sure.
>>>>
>>>> println(conf.get("spark.streaming.backpressure.enabled")) prints true.
>>>>
>>>>
>>>> On 10/12/2016 05:48 PM, Cody Koeninger wrote:
>>>>>
>>>>>
>>>>> Just to make 100% sure, did you set
>>>>>
>>>>> spark.streaming.backpressure.enabled
>>>>>
>>>>> to true?
>>>>>
>>>>> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 10/12/2016 04:40 PM, Cody Koeninger wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> How would backpressure know anything about the capacity of your system
>>>>>>> on the very first batch?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> You should be able to set maxRatePerPartition at a value that makes
>>>>>>> sure your first batch doesn't blow things up, and let backpressure
>>>>>>> scale from there.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Backpressure doesn't scale even when using maxRatePerPartition: when I
>>>>>> enable backpressure and set maxRatePerPartition to n, I always get n
>>>>>> records, even if my batch takes longer than batchDuration to finish.
>>>>>>
>>>>>> Example:
>>>>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
>>>>>> Durations.seconds(1))`
>>>>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000
>>>>>> and
>>>>>> enable backpressure
>>>>>> * Since I can't handle 100,000 records in 1 second, I expect the
>>>>>> backpressure to kick in in the second batch, and get less than 100,000;
>>>>>> but
>>>>>> this does not happen
>>>>>>
>>>>>> What am I missing here?
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <sa...@dindane.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> That's what I was looking for, thank you.
>>>>>>>>
>>>>>>>> Unfortunately, neither
>>>>>>>>
>>>>>>>> * spark.streaming.backpressure.initialRate
>>>>>>>> * spark.streaming.backpressure.enabled
>>>>>>>> * spark.streaming.receiver.maxRate
>>>>>>>> * spark.streaming.receiver.initialRate
>>>>>>>>
>>>>>>>> change how many records I get (I tried many different combinations).
>>>>>>>>
>>>>>>>> The only configuration that works is
>>>>>>>> "spark.streaming.kafka.maxRatePerPartition".
>>>>>>>> That's better than nothing, but I'd be useful to have backpressure
>>>>>>>> enabled
>>>>>>>> for automatic scaling.
>>>>>>>>
>>>>>>>> Do you have any idea about why aren't backpressure working? How to
>>>>>>>> debug
>>>>>>>> this?
>>>>>>>>
>>>>>>>>
>>>>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>>>
>>>>>>>>> "This rate is upper bounded by the values
>>>>>>>>> spark.streaming.receiver.maxRate and
>>>>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>>>>>>>>> below)."
>>>>>>>>>
>>>>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <sa...@dindane.com>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Is it possible to limit the size of the batches returned by the
>>>>>>>>>> Kafka
>>>>>>>>>> consumer for Spark Streaming?
>>>>>>>>>> I am asking because the first batch I get has hundred of millions
>>>>>>>>>> of
>>>>>>>>>> records
>>>>>>>>>> and it takes ages to process and checkpoint them.
>>>>>>>>>>
>>>>>>>>>> Thank you.
>>>>>>>>>>
>>>>>>>>>> Samy
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Limit Kafka batches size with Spark Streaming

Posted by Cody Koeninger <co...@koeninger.org>.
So I see in the logs that PIDRateEstimator is choosing a new rate, and
the rate it's choosing is 100.

That happens to be the default minimum of an (apparently undocumented) setting,

spark.streaming.backpressure.pid.minRate

Try setting that to 1 and see if there's different behavior.

BTW, how many kafka partitions are you using, and how many actually
have data for a given batch?


On Thu, Oct 13, 2016 at 4:33 AM, Samy Dindane <sa...@dindane.com> wrote:
> Hey Cody,
>
> Thanks for the reply. Really helpful.
>
> Following your suggestion, I set spark.streaming.backpressure.enabled to
> true and maxRatePerPartition to 100000.
> I know I can handle 100k records at the same time, but definitely not in 1
> second (the batchDuration), so I expect the backpressure to lower that
> number.
>
> Unfortunately the backpressure doesn't work and I keep getting 100k records
> per batch.
>
> Here is my output log:
> https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba
> And this is my conf:
>
>     conf.set("spark.streaming.kafka.consumer.poll.ms", "30000")
>     conf.set("spark.streaming.kafka.maxRatePerPartition", "100000")
>     conf.set("spark.streaming.backpressure.enabled", "true")
>
> That's not normal, is it? Do you notice anything odd in my logs?
>
> Thanks a lot.
>
>
>
> On 10/12/2016 07:31 PM, Cody Koeninger wrote:
>>
>> Cool, just wanted to make sure.
>>
>> To answer your question about
>>
>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>
>>
>> that configuration was added well after the integration of the direct
>> stream with the backpressure code, and was added only to the receiver
>> code, which the direct stream doesn't share since it isn't a receiver.
>> Not making excuses about it being confusing, just explaining how
>> things ended up that way :(  So yeah, maxRatePerPartition is the
>> closest thing you have on the direct stream side to being able to
>> limit before the backpressure estimator has something to work with.
>>
>> So to try and debug what you're seeing, if you add a line like this to
>> your log4j.properties
>>
>> log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE
>>
>> you should start seeing log lines like
>>
>> 16/10/12 12:18:01 TRACE PIDRateEstimator:
>> time = 1476292681092, # records = 20, processing time = 20949,
>> scheduling delay = 6
>> 16/10/12 12:18:01 TRACE PIDRateEstimator:
>> latestRate = -1.0, error = -1.9546995083297531
>> latestError = -1.0, historicalError = 0.001145639409995704
>> delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10
>>
>> and then once it updates, lines like
>>
>> 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0
>>
>> For a really artificially constrained example where
>> maxRatePerPartition is set such that it limits to 20 per batch but the
>> system can really only handle 5 per batch, the streaming UI will look
>> something like this:
>>
>> https://i.imgsafe.org/e730492453.png
>>
>> notice the cutover point
>>
>>
>> On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>
>>> I am 100% sure.
>>>
>>> println(conf.get("spark.streaming.backpressure.enabled")) prints true.
>>>
>>>
>>> On 10/12/2016 05:48 PM, Cody Koeninger wrote:
>>>>
>>>>
>>>> Just to make 100% sure, did you set
>>>>
>>>> spark.streaming.backpressure.enabled
>>>>
>>>> to true?
>>>>
>>>> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 10/12/2016 04:40 PM, Cody Koeninger wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> How would backpressure know anything about the capacity of your system
>>>>>> on the very first batch?
>>>>>
>>>>>
>>>>>
>>>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> You should be able to set maxRatePerPartition at a value that makes
>>>>>> sure your first batch doesn't blow things up, and let backpressure
>>>>>> scale from there.
>>>>>
>>>>>
>>>>>
>>>>> Backpressure doesn't scale even when using maxRatePerPartition: when I
>>>>> enable backpressure and set maxRatePerPartition to n, I always get n
>>>>> records, even if my batch takes longer than batchDuration to finish.
>>>>>
>>>>> Example:
>>>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
>>>>> Durations.seconds(1))`
>>>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000
>>>>> and
>>>>> enable backpressure
>>>>> * Since I can't handle 100,000 records in 1 second, I expect the
>>>>> backpressure to kick in in the second batch, and get less than 100,000;
>>>>> but
>>>>> this does not happen
>>>>>
>>>>> What am I missing here?
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <sa...@dindane.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> That's what I was looking for, thank you.
>>>>>>>
>>>>>>> Unfortunately, neither
>>>>>>>
>>>>>>> * spark.streaming.backpressure.initialRate
>>>>>>> * spark.streaming.backpressure.enabled
>>>>>>> * spark.streaming.receiver.maxRate
>>>>>>> * spark.streaming.receiver.initialRate
>>>>>>>
>>>>>>> change how many records I get (I tried many different combinations).
>>>>>>>
>>>>>>> The only configuration that works is
>>>>>>> "spark.streaming.kafka.maxRatePerPartition".
>>>>>>> That's better than nothing, but I'd be useful to have backpressure
>>>>>>> enabled
>>>>>>> for automatic scaling.
>>>>>>>
>>>>>>> Do you have any idea about why aren't backpressure working? How to
>>>>>>> debug
>>>>>>> this?
>>>>>>>
>>>>>>>
>>>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>>
>>>>>>>> "This rate is upper bounded by the values
>>>>>>>> spark.streaming.receiver.maxRate and
>>>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>>>>>>>> below)."
>>>>>>>>
>>>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <sa...@dindane.com>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Is it possible to limit the size of the batches returned by the
>>>>>>>>> Kafka
>>>>>>>>> consumer for Spark Streaming?
>>>>>>>>> I am asking because the first batch I get has hundred of millions
>>>>>>>>> of
>>>>>>>>> records
>>>>>>>>> and it takes ages to process and checkpoint them.
>>>>>>>>>
>>>>>>>>> Thank you.
>>>>>>>>>
>>>>>>>>> Samy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Limit Kafka batches size with Spark Streaming

Posted by Samy Dindane <sa...@dindane.com>.
Hey Cody,

Thanks for the reply. Really helpful.

Following your suggestion, I set spark.streaming.backpressure.enabled to true and maxRatePerPartition to 100000.
I know I can handle 100k records at the same time, but definitely not in 1 second (the batchDuration), so I expect the backpressure to lower that number.

Unfortunately the backpressure doesn't work and I keep getting 100k records per batch.

Here is my output log: https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba
And this is my conf:

     conf.set("spark.streaming.kafka.consumer.poll.ms", "30000")
     conf.set("spark.streaming.kafka.maxRatePerPartition", "100000")
     conf.set("spark.streaming.backpressure.enabled", "true")

That's not normal, is it? Do you notice anything odd in my logs?

Thanks a lot.


On 10/12/2016 07:31 PM, Cody Koeninger wrote:
> Cool, just wanted to make sure.
>
> To answer your question about
>
>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>
> that configuration was added well after the integration of the direct
> stream with the backpressure code, and was added only to the receiver
> code, which the direct stream doesn't share since it isn't a receiver.
> Not making excuses about it being confusing, just explaining how
> things ended up that way :(  So yeah, maxRatePerPartition is the
> closest thing you have on the direct stream side to being able to
> limit before the backpressure estimator has something to work with.
>
> So to try and debug what you're seeing, if you add a line like this to
> your log4j.properties
>
> log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE
>
> you should start seeing log lines like
>
> 16/10/12 12:18:01 TRACE PIDRateEstimator:
> time = 1476292681092, # records = 20, processing time = 20949,
> scheduling delay = 6
> 16/10/12 12:18:01 TRACE PIDRateEstimator:
> latestRate = -1.0, error = -1.9546995083297531
> latestError = -1.0, historicalError = 0.001145639409995704
> delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10
>
> and then once it updates, lines like
>
> 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0
>
> For a really artificially constrained example where
> maxRatePerPartition is set such that it limits to 20 per batch but the
> system can really only handle 5 per batch, the streaming UI will look
> something like this:
>
> https://i.imgsafe.org/e730492453.png
>
> notice the cutover point
>
>
> On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <sa...@dindane.com> wrote:
>> I am 100% sure.
>>
>> println(conf.get("spark.streaming.backpressure.enabled")) prints true.
>>
>>
>> On 10/12/2016 05:48 PM, Cody Koeninger wrote:
>>>
>>> Just to make 100% sure, did you set
>>>
>>> spark.streaming.backpressure.enabled
>>>
>>> to true?
>>>
>>> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>>
>>>>
>>>>
>>>> On 10/12/2016 04:40 PM, Cody Koeninger wrote:
>>>>>
>>>>>
>>>>> How would backpressure know anything about the capacity of your system
>>>>> on the very first batch?
>>>>
>>>>
>>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>>>>
>>>>>
>>>>>
>>>>> You should be able to set maxRatePerPartition at a value that makes
>>>>> sure your first batch doesn't blow things up, and let backpressure
>>>>> scale from there.
>>>>
>>>>
>>>> Backpressure doesn't scale even when using maxRatePerPartition: when I
>>>> enable backpressure and set maxRatePerPartition to n, I always get n
>>>> records, even if my batch takes longer than batchDuration to finish.
>>>>
>>>> Example:
>>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
>>>> Durations.seconds(1))`
>>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000
>>>> and
>>>> enable backpressure
>>>> * Since I can't handle 100,000 records in 1 second, I expect the
>>>> backpressure to kick in in the second batch, and get less than 100,000;
>>>> but
>>>> this does not happen
>>>>
>>>> What am I missing here?
>>>>
>>>>
>>>>
>>>>>
>>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>>>>
>>>>>>
>>>>>> That's what I was looking for, thank you.
>>>>>>
>>>>>> Unfortunately, neither
>>>>>>
>>>>>> * spark.streaming.backpressure.initialRate
>>>>>> * spark.streaming.backpressure.enabled
>>>>>> * spark.streaming.receiver.maxRate
>>>>>> * spark.streaming.receiver.initialRate
>>>>>>
>>>>>> change how many records I get (I tried many different combinations).
>>>>>>
>>>>>> The only configuration that works is
>>>>>> "spark.streaming.kafka.maxRatePerPartition".
>>>>>> That's better than nothing, but I'd be useful to have backpressure
>>>>>> enabled
>>>>>> for automatic scaling.
>>>>>>
>>>>>> Do you have any idea about why aren't backpressure working? How to
>>>>>> debug
>>>>>> this?
>>>>>>
>>>>>>
>>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>
>>>>>>> "This rate is upper bounded by the values
>>>>>>> spark.streaming.receiver.maxRate and
>>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>>>>>>> below)."
>>>>>>>
>>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <sa...@dindane.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Is it possible to limit the size of the batches returned by the Kafka
>>>>>>>> consumer for Spark Streaming?
>>>>>>>> I am asking because the first batch I get has hundred of millions of
>>>>>>>> records
>>>>>>>> and it takes ages to process and checkpoint them.
>>>>>>>>
>>>>>>>> Thank you.
>>>>>>>>
>>>>>>>> Samy
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>
>>>>>>
>>>>
>>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Limit Kafka batches size with Spark Streaming

Posted by Cody Koeninger <co...@koeninger.org>.
Cool, just wanted to make sure.

To answer your question about

> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?

that configuration was added well after the integration of the direct
stream with the backpressure code, and was added only to the receiver
code, which the direct stream doesn't share since it isn't a receiver.
Not making excuses about it being confusing, just explaining how
things ended up that way :(  So yeah, maxRatePerPartition is the
closest thing you have on the direct stream side to being able to
limit before the backpressure estimator has something to work with.

So to try and debug what you're seeing, if you add a line like this to
your log4j.properties

log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE

you should start seeing log lines like

16/10/12 12:18:01 TRACE PIDRateEstimator:
time = 1476292681092, # records = 20, processing time = 20949,
scheduling delay = 6
16/10/12 12:18:01 TRACE PIDRateEstimator:
latestRate = -1.0, error = -1.9546995083297531
latestError = -1.0, historicalError = 0.001145639409995704
delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10

and then once it updates, lines like

16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0

For a really artificially constrained example where
maxRatePerPartition is set such that it limits to 20 per batch but the
system can really only handle 5 per batch, the streaming UI will look
something like this:

https://i.imgsafe.org/e730492453.png

notice the cutover point


On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane <sa...@dindane.com> wrote:
> I am 100% sure.
>
> println(conf.get("spark.streaming.backpressure.enabled")) prints true.
>
>
> On 10/12/2016 05:48 PM, Cody Koeninger wrote:
>>
>> Just to make 100% sure, did you set
>>
>> spark.streaming.backpressure.enabled
>>
>> to true?
>>
>> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>
>>>
>>>
>>> On 10/12/2016 04:40 PM, Cody Koeninger wrote:
>>>>
>>>>
>>>> How would backpressure know anything about the capacity of your system
>>>> on the very first batch?
>>>
>>>
>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>>>
>>>>
>>>>
>>>> You should be able to set maxRatePerPartition at a value that makes
>>>> sure your first batch doesn't blow things up, and let backpressure
>>>> scale from there.
>>>
>>>
>>> Backpressure doesn't scale even when using maxRatePerPartition: when I
>>> enable backpressure and set maxRatePerPartition to n, I always get n
>>> records, even if my batch takes longer than batchDuration to finish.
>>>
>>> Example:
>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
>>> Durations.seconds(1))`
>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000
>>> and
>>> enable backpressure
>>> * Since I can't handle 100,000 records in 1 second, I expect the
>>> backpressure to kick in in the second batch, and get less than 100,000;
>>> but
>>> this does not happen
>>>
>>> What am I missing here?
>>>
>>>
>>>
>>>>
>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>>>
>>>>>
>>>>> That's what I was looking for, thank you.
>>>>>
>>>>> Unfortunately, neither
>>>>>
>>>>> * spark.streaming.backpressure.initialRate
>>>>> * spark.streaming.backpressure.enabled
>>>>> * spark.streaming.receiver.maxRate
>>>>> * spark.streaming.receiver.initialRate
>>>>>
>>>>> change how many records I get (I tried many different combinations).
>>>>>
>>>>> The only configuration that works is
>>>>> "spark.streaming.kafka.maxRatePerPartition".
>>>>> That's better than nothing, but I'd be useful to have backpressure
>>>>> enabled
>>>>> for automatic scaling.
>>>>>
>>>>> Do you have any idea about why aren't backpressure working? How to
>>>>> debug
>>>>> this?
>>>>>
>>>>>
>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>
>>>>>> "This rate is upper bounded by the values
>>>>>> spark.streaming.receiver.maxRate and
>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>>>>>> below)."
>>>>>>
>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <sa...@dindane.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Is it possible to limit the size of the batches returned by the Kafka
>>>>>>> consumer for Spark Streaming?
>>>>>>> I am asking because the first batch I get has hundred of millions of
>>>>>>> records
>>>>>>> and it takes ages to process and checkpoint them.
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>> Samy
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>
>>>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Limit Kafka batches size with Spark Streaming

Posted by Samy Dindane <sa...@dindane.com>.
I am 100% sure.

println(conf.get("spark.streaming.backpressure.enabled")) prints true.

On 10/12/2016 05:48 PM, Cody Koeninger wrote:
> Just to make 100% sure, did you set
>
> spark.streaming.backpressure.enabled
>
> to true?
>
> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane <sa...@dindane.com> wrote:
>>
>>
>> On 10/12/2016 04:40 PM, Cody Koeninger wrote:
>>>
>>> How would backpressure know anything about the capacity of your system
>>> on the very first batch?
>>
>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>>
>>>
>>> You should be able to set maxRatePerPartition at a value that makes
>>> sure your first batch doesn't blow things up, and let backpressure
>>> scale from there.
>>
>> Backpressure doesn't scale even when using maxRatePerPartition: when I
>> enable backpressure and set maxRatePerPartition to n, I always get n
>> records, even if my batch takes longer than batchDuration to finish.
>>
>> Example:
>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
>> Durations.seconds(1))`
>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000 and
>> enable backpressure
>> * Since I can't handle 100,000 records in 1 second, I expect the
>> backpressure to kick in in the second batch, and get less than 100,000; but
>> this does not happen
>>
>> What am I missing here?
>>
>>
>>
>>>
>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>>
>>>> That's what I was looking for, thank you.
>>>>
>>>> Unfortunately, neither
>>>>
>>>> * spark.streaming.backpressure.initialRate
>>>> * spark.streaming.backpressure.enabled
>>>> * spark.streaming.receiver.maxRate
>>>> * spark.streaming.receiver.initialRate
>>>>
>>>> change how many records I get (I tried many different combinations).
>>>>
>>>> The only configuration that works is
>>>> "spark.streaming.kafka.maxRatePerPartition".
>>>> That's better than nothing, but I'd be useful to have backpressure
>>>> enabled
>>>> for automatic scaling.
>>>>
>>>> Do you have any idea about why aren't backpressure working? How to debug
>>>> this?
>>>>
>>>>
>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>>>>
>>>>>
>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>
>>>>> "This rate is upper bounded by the values
>>>>> spark.streaming.receiver.maxRate and
>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>>>>> below)."
>>>>>
>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Is it possible to limit the size of the batches returned by the Kafka
>>>>>> consumer for Spark Streaming?
>>>>>> I am asking because the first batch I get has hundred of millions of
>>>>>> records
>>>>>> and it takes ages to process and checkpoint them.
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>> Samy
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>
>>>>
>>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Limit Kafka batches size with Spark Streaming

Posted by Cody Koeninger <co...@koeninger.org>.
How would backpressure know anything about the capacity of your system
on the very first batch?

You should be able to set maxRatePerPartition at a value that makes
sure your first batch doesn't blow things up, and let backpressure
scale from there.

On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane <sa...@dindane.com> wrote:
> That's what I was looking for, thank you.
>
> Unfortunately, neither
>
> * spark.streaming.backpressure.initialRate
> * spark.streaming.backpressure.enabled
> * spark.streaming.receiver.maxRate
> * spark.streaming.receiver.initialRate
>
> change how many records I get (I tried many different combinations).
>
> The only configuration that works is
> "spark.streaming.kafka.maxRatePerPartition".
> That's better than nothing, but I'd be useful to have backpressure enabled
> for automatic scaling.
>
> Do you have any idea about why aren't backpressure working? How to debug
> this?
>
>
> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> "This rate is upper bounded by the values
>> spark.streaming.receiver.maxRate and
>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>> below)."
>>
>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <sa...@dindane.com> wrote:
>>>
>>> Hi,
>>>
>>> Is it possible to limit the size of the batches returned by the Kafka
>>> consumer for Spark Streaming?
>>> I am asking because the first batch I get has hundred of millions of
>>> records
>>> and it takes ages to process and checkpoint them.
>>>
>>> Thank you.
>>>
>>> Samy
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Limit Kafka batches size with Spark Streaming

Posted by Samy Dindane <sa...@dindane.com>.
That's what I was looking for, thank you.

Unfortunately, neither

* spark.streaming.backpressure.initialRate
* spark.streaming.backpressure.enabled
* spark.streaming.receiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different combinations).

The only configuration that works is "spark.streaming.kafka.maxRatePerPartition".
That's better than nothing, but I'd be useful to have backpressure enabled for automatic scaling.

Do you have any idea about why aren't backpressure working? How to debug this?

On 10/11/2016 06:08 PM, Cody Koeninger wrote:
> http://spark.apache.org/docs/latest/configuration.html
>
> "This rate is upper bounded by the values
> spark.streaming.receiver.maxRate and
> spark.streaming.kafka.maxRatePerPartition if they are set (see
> below)."
>
> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <sa...@dindane.com> wrote:
>> Hi,
>>
>> Is it possible to limit the size of the batches returned by the Kafka
>> consumer for Spark Streaming?
>> I am asking because the first batch I get has hundred of millions of records
>> and it takes ages to process and checkpoint them.
>>
>> Thank you.
>>
>> Samy
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Limit Kafka batches size with Spark Streaming

Posted by Cody Koeninger <co...@koeninger.org>.
http://spark.apache.org/docs/latest/configuration.html

"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see
below)."

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane <sa...@dindane.com> wrote:
> Hi,
>
> Is it possible to limit the size of the batches returned by the Kafka
> consumer for Spark Streaming?
> I am asking because the first batch I get has hundred of millions of records
> and it takes ages to process and checkpoint them.
>
> Thank you.
>
> Samy
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org