You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kant kodali <ka...@gmail.com> on 2018/03/19 20:35:53 UTC

select count * doesnt seem to respect update mode in Kafka Structured Streaming?

Hi All,

I have 10 million records in my Kafka and I am just trying to
spark.sql(select count(*) from kafka_view). I am reading from kafka and
writing to kafka.

My writeStream is set to "update" mode and trigger interval of one second (
Trigger.ProcessingTime(1000)). I expect the counts to be printed every
second but looks like it would print after going through all 10M. why?

Also, it seems to take forever whereas Linux wc of 10M rows would take 30
seconds.

Thanks!

Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

Posted by kant kodali <ka...@gmail.com>.
Thanks Michael! that works!

On Tue, Mar 20, 2018 at 5:00 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Those options will not affect structured streaming.  You are looking for
>
> .option("maxOffsetsPerTrigger", "1000")
>
> We are working on improving this by building a generic mechanism into the
> Streaming DataSource V2 so that the engine can do admission control on the
> amount of data returned in a source independent way.
>
> On Tue, Mar 20, 2018 at 2:58 PM, kant kodali <ka...@gmail.com> wrote:
>
>> I am using spark 2.3.0 and Kafka 0.10.2.0 so I assume structured
>> streaming using Direct API's although I am not sure? If it is direct API's
>> the only parameters that are relevant are below according to this
>> <https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang>
>> article
>>
>>    - spark.conf("spark.streaming.backpressure.enabled", "true")
>>    - spark.conf("spark.streaming.kafka.maxRatePerPartition", "10000")
>>
>> I set both of these and I run select count * on my 10M records I still
>> don't see any output until it finishes the initial batch of 10M and this
>> takes a while. so I am wondering if I miss something here?
>>
>> On Tue, Mar 20, 2018 at 6:09 AM, Geoff Von Allmen <geoff@ibleducation.com
>> > wrote:
>>
>>> The following
>>> <http://spark.apache.org/docs/latest/configuration.html#spark-streaming> settings
>>> may be what you’re looking for:
>>>
>>>    - spark.streaming.backpressure.enabled
>>>    - spark.streaming.backpressure.initialRate
>>>    - spark.streaming.receiver.maxRate
>>>    - spark.streaming.kafka.maxRatePerPartition
>>>
>>> ​
>>>
>>> On Mon, Mar 19, 2018 at 5:27 PM, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Yes it indeed makes sense! Is there a way to get incremental counts
>>>> when I start from 0 and go through 10M records? perhaps count for every
>>>> micro batch or something?
>>>>
>>>> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen <
>>>> geoff@ibleducation.com> wrote:
>>>>
>>>>> Trigger does not mean report the current solution every 'trigger
>>>>> seconds'. It means it will attempt to fetch new data and process it no
>>>>> faster than trigger seconds intervals.
>>>>>
>>>>> If you're reading from the beginning and you've got 10M entries in
>>>>> kafka, it's likely pulling everything down then processing it completely
>>>>> and giving you an initial output. From here on out, it will check kafka
>>>>> every 1 second for new data and process it, showing you only the updated
>>>>> rows. So the initial read will give you the entire output since there is
>>>>> nothing to be 'updating' from. If you add data to kafka now that the
>>>>> streaming job has completed it's first batch (and leave it running), it
>>>>> will then show you the new/updated rows since the last batch every 1 second
>>>>> (assuming it can fetch + process in that time span).
>>>>>
>>>>> If the combined fetch + processing time is > the trigger time, you
>>>>> will notice warnings that it is 'falling behind' (I forget the exact
>>>>> verbiage, but something to the effect of the calculation took XX time and
>>>>> is falling behind). In that case, it will immediately check kafka for new
>>>>> messages and begin processing the next batch (if new messages exist).
>>>>>
>>>>> Hope that makes sense -
>>>>>
>>>>>
>>>>> On Mon, Mar 19, 2018 at 13:36 kant kodali <ka...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have 10 million records in my Kafka and I am just trying to
>>>>>> spark.sql(select count(*) from kafka_view). I am reading from kafka and
>>>>>> writing to kafka.
>>>>>>
>>>>>> My writeStream is set to "update" mode and trigger interval of one
>>>>>> second (Trigger.ProcessingTime(1000)). I expect the counts to be
>>>>>> printed every second but looks like it would print after going through all
>>>>>> 10M. why?
>>>>>>
>>>>>> Also, it seems to take forever whereas Linux wc of 10M rows would
>>>>>> take 30 seconds.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

Posted by Michael Armbrust <mi...@databricks.com>.
Those options will not affect structured streaming.  You are looking for

.option("maxOffsetsPerTrigger", "1000")

We are working on improving this by building a generic mechanism into the
Streaming DataSource V2 so that the engine can do admission control on the
amount of data returned in a source independent way.

On Tue, Mar 20, 2018 at 2:58 PM, kant kodali <ka...@gmail.com> wrote:

> I am using spark 2.3.0 and Kafka 0.10.2.0 so I assume structured streaming
> using Direct API's although I am not sure? If it is direct API's the only
> parameters that are relevant are below according to this
> <https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang>
> article
>
>    - spark.conf("spark.streaming.backpressure.enabled", "true")
>    - spark.conf("spark.streaming.kafka.maxRatePerPartition", "10000")
>
> I set both of these and I run select count * on my 10M records I still
> don't see any output until it finishes the initial batch of 10M and this
> takes a while. so I am wondering if I miss something here?
>
> On Tue, Mar 20, 2018 at 6:09 AM, Geoff Von Allmen <ge...@ibleducation.com>
> wrote:
>
>> The following
>> <http://spark.apache.org/docs/latest/configuration.html#spark-streaming> settings
>> may be what you’re looking for:
>>
>>    - spark.streaming.backpressure.enabled
>>    - spark.streaming.backpressure.initialRate
>>    - spark.streaming.receiver.maxRate
>>    - spark.streaming.kafka.maxRatePerPartition
>>
>> ​
>>
>> On Mon, Mar 19, 2018 at 5:27 PM, kant kodali <ka...@gmail.com> wrote:
>>
>>> Yes it indeed makes sense! Is there a way to get incremental counts when
>>> I start from 0 and go through 10M records? perhaps count for every micro
>>> batch or something?
>>>
>>> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen <
>>> geoff@ibleducation.com> wrote:
>>>
>>>> Trigger does not mean report the current solution every 'trigger
>>>> seconds'. It means it will attempt to fetch new data and process it no
>>>> faster than trigger seconds intervals.
>>>>
>>>> If you're reading from the beginning and you've got 10M entries in
>>>> kafka, it's likely pulling everything down then processing it completely
>>>> and giving you an initial output. From here on out, it will check kafka
>>>> every 1 second for new data and process it, showing you only the updated
>>>> rows. So the initial read will give you the entire output since there is
>>>> nothing to be 'updating' from. If you add data to kafka now that the
>>>> streaming job has completed it's first batch (and leave it running), it
>>>> will then show you the new/updated rows since the last batch every 1 second
>>>> (assuming it can fetch + process in that time span).
>>>>
>>>> If the combined fetch + processing time is > the trigger time, you will
>>>> notice warnings that it is 'falling behind' (I forget the exact verbiage,
>>>> but something to the effect of the calculation took XX time and is falling
>>>> behind). In that case, it will immediately check kafka for new messages and
>>>> begin processing the next batch (if new messages exist).
>>>>
>>>> Hope that makes sense -
>>>>
>>>>
>>>> On Mon, Mar 19, 2018 at 13:36 kant kodali <ka...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have 10 million records in my Kafka and I am just trying to
>>>>> spark.sql(select count(*) from kafka_view). I am reading from kafka and
>>>>> writing to kafka.
>>>>>
>>>>> My writeStream is set to "update" mode and trigger interval of one
>>>>> second (Trigger.ProcessingTime(1000)). I expect the counts to be
>>>>> printed every second but looks like it would print after going through all
>>>>> 10M. why?
>>>>>
>>>>> Also, it seems to take forever whereas Linux wc of 10M rows would take
>>>>> 30 seconds.
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>
>>
>

Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

Posted by kant kodali <ka...@gmail.com>.
I am using spark 2.3.0 and Kafka 0.10.2.0 so I assume structured streaming
using Direct API's although I am not sure? If it is direct API's the only
parameters that are relevant are below according to this
<https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang>
article

   - spark.conf("spark.streaming.backpressure.enabled", "true")
   - spark.conf("spark.streaming.kafka.maxRatePerPartition", "10000")

I set both of these and I run select count * on my 10M records I still
don't see any output until it finishes the initial batch of 10M and this
takes a while. so I am wondering if I miss something here?

On Tue, Mar 20, 2018 at 6:09 AM, Geoff Von Allmen <ge...@ibleducation.com>
wrote:

> The following
> <http://spark.apache.org/docs/latest/configuration.html#spark-streaming> settings
> may be what you’re looking for:
>
>    - spark.streaming.backpressure.enabled
>    - spark.streaming.backpressure.initialRate
>    - spark.streaming.receiver.maxRate
>    - spark.streaming.kafka.maxRatePerPartition
>
> ​
>
> On Mon, Mar 19, 2018 at 5:27 PM, kant kodali <ka...@gmail.com> wrote:
>
>> Yes it indeed makes sense! Is there a way to get incremental counts when
>> I start from 0 and go through 10M records? perhaps count for every micro
>> batch or something?
>>
>> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen <geoff@ibleducation.com
>> > wrote:
>>
>>> Trigger does not mean report the current solution every 'trigger
>>> seconds'. It means it will attempt to fetch new data and process it no
>>> faster than trigger seconds intervals.
>>>
>>> If you're reading from the beginning and you've got 10M entries in
>>> kafka, it's likely pulling everything down then processing it completely
>>> and giving you an initial output. From here on out, it will check kafka
>>> every 1 second for new data and process it, showing you only the updated
>>> rows. So the initial read will give you the entire output since there is
>>> nothing to be 'updating' from. If you add data to kafka now that the
>>> streaming job has completed it's first batch (and leave it running), it
>>> will then show you the new/updated rows since the last batch every 1 second
>>> (assuming it can fetch + process in that time span).
>>>
>>> If the combined fetch + processing time is > the trigger time, you will
>>> notice warnings that it is 'falling behind' (I forget the exact verbiage,
>>> but something to the effect of the calculation took XX time and is falling
>>> behind). In that case, it will immediately check kafka for new messages and
>>> begin processing the next batch (if new messages exist).
>>>
>>> Hope that makes sense -
>>>
>>>
>>> On Mon, Mar 19, 2018 at 13:36 kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have 10 million records in my Kafka and I am just trying to
>>>> spark.sql(select count(*) from kafka_view). I am reading from kafka and
>>>> writing to kafka.
>>>>
>>>> My writeStream is set to "update" mode and trigger interval of one
>>>> second (Trigger.ProcessingTime(1000)). I expect the counts to be
>>>> printed every second but looks like it would print after going through all
>>>> 10M. why?
>>>>
>>>> Also, it seems to take forever whereas Linux wc of 10M rows would take
>>>> 30 seconds.
>>>>
>>>> Thanks!
>>>>
>>>
>>
>

Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

Posted by Geoff Von Allmen <ge...@ibleducation.com>.
The following
<http://spark.apache.org/docs/latest/configuration.html#spark-streaming>
settings
may be what you’re looking for:

   - spark.streaming.backpressure.enabled
   - spark.streaming.backpressure.initialRate
   - spark.streaming.receiver.maxRate
   - spark.streaming.kafka.maxRatePerPartition

​

On Mon, Mar 19, 2018 at 5:27 PM, kant kodali <ka...@gmail.com> wrote:

> Yes it indeed makes sense! Is there a way to get incremental counts when I
> start from 0 and go through 10M records? perhaps count for every micro
> batch or something?
>
> On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen <ge...@ibleducation.com>
> wrote:
>
>> Trigger does not mean report the current solution every 'trigger
>> seconds'. It means it will attempt to fetch new data and process it no
>> faster than trigger seconds intervals.
>>
>> If you're reading from the beginning and you've got 10M entries in kafka,
>> it's likely pulling everything down then processing it completely and
>> giving you an initial output. From here on out, it will check kafka every 1
>> second for new data and process it, showing you only the updated rows. So
>> the initial read will give you the entire output since there is nothing to
>> be 'updating' from. If you add data to kafka now that the streaming job has
>> completed it's first batch (and leave it running), it will then show you
>> the new/updated rows since the last batch every 1 second (assuming it can
>> fetch + process in that time span).
>>
>> If the combined fetch + processing time is > the trigger time, you will
>> notice warnings that it is 'falling behind' (I forget the exact verbiage,
>> but something to the effect of the calculation took XX time and is falling
>> behind). In that case, it will immediately check kafka for new messages and
>> begin processing the next batch (if new messages exist).
>>
>> Hope that makes sense -
>>
>>
>> On Mon, Mar 19, 2018 at 13:36 kant kodali <ka...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I have 10 million records in my Kafka and I am just trying to
>>> spark.sql(select count(*) from kafka_view). I am reading from kafka and
>>> writing to kafka.
>>>
>>> My writeStream is set to "update" mode and trigger interval of one
>>> second (Trigger.ProcessingTime(1000)). I expect the counts to be
>>> printed every second but looks like it would print after going through all
>>> 10M. why?
>>>
>>> Also, it seems to take forever whereas Linux wc of 10M rows would take
>>> 30 seconds.
>>>
>>> Thanks!
>>>
>>
>

Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

Posted by kant kodali <ka...@gmail.com>.
Yes it indeed makes sense! Is there a way to get incremental counts when I
start from 0 and go through 10M records? perhaps count for every micro
batch or something?

On Mon, Mar 19, 2018 at 1:57 PM, Geoff Von Allmen <ge...@ibleducation.com>
wrote:

> Trigger does not mean report the current solution every 'trigger seconds'.
> It means it will attempt to fetch new data and process it no faster than
> trigger seconds intervals.
>
> If you're reading from the beginning and you've got 10M entries in kafka,
> it's likely pulling everything down then processing it completely and
> giving you an initial output. From here on out, it will check kafka every 1
> second for new data and process it, showing you only the updated rows. So
> the initial read will give you the entire output since there is nothing to
> be 'updating' from. If you add data to kafka now that the streaming job has
> completed it's first batch (and leave it running), it will then show you
> the new/updated rows since the last batch every 1 second (assuming it can
> fetch + process in that time span).
>
> If the combined fetch + processing time is > the trigger time, you will
> notice warnings that it is 'falling behind' (I forget the exact verbiage,
> but something to the effect of the calculation took XX time and is falling
> behind). In that case, it will immediately check kafka for new messages and
> begin processing the next batch (if new messages exist).
>
> Hope that makes sense -
>
>
> On Mon, Mar 19, 2018 at 13:36 kant kodali <ka...@gmail.com> wrote:
>
>> Hi All,
>>
>> I have 10 million records in my Kafka and I am just trying to
>> spark.sql(select count(*) from kafka_view). I am reading from kafka and
>> writing to kafka.
>>
>> My writeStream is set to "update" mode and trigger interval of one
>> second (Trigger.ProcessingTime(1000)). I expect the counts to be printed
>> every second but looks like it would print after going through all 10M.
>> why?
>>
>> Also, it seems to take forever whereas Linux wc of 10M rows would take 30
>> seconds.
>>
>> Thanks!
>>
>

Re: select count * doesnt seem to respect update mode in Kafka Structured Streaming?

Posted by Geoff Von Allmen <ge...@ibleducation.com>.
Trigger does not mean report the current solution every 'trigger seconds'.
It means it will attempt to fetch new data and process it no faster than
trigger seconds intervals.

If you're reading from the beginning and you've got 10M entries in kafka,
it's likely pulling everything down then processing it completely and
giving you an initial output. From here on out, it will check kafka every 1
second for new data and process it, showing you only the updated rows. So
the initial read will give you the entire output since there is nothing to
be 'updating' from. If you add data to kafka now that the streaming job has
completed it's first batch (and leave it running), it will then show you
the new/updated rows since the last batch every 1 second (assuming it can
fetch + process in that time span).

If the combined fetch + processing time is > the trigger time, you will
notice warnings that it is 'falling behind' (I forget the exact verbiage,
but something to the effect of the calculation took XX time and is falling
behind). In that case, it will immediately check kafka for new messages and
begin processing the next batch (if new messages exist).

Hope that makes sense -


On Mon, Mar 19, 2018 at 13:36 kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> I have 10 million records in my Kafka and I am just trying to
> spark.sql(select count(*) from kafka_view). I am reading from kafka and
> writing to kafka.
>
> My writeStream is set to "update" mode and trigger interval of one second (
> Trigger.ProcessingTime(1000)). I expect the counts to be printed every
> second but looks like it would print after going through all 10M. why?
>
> Also, it seems to take forever whereas Linux wc of 10M rows would take 30
> seconds.
>
> Thanks!
>