You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kant kodali <ka...@gmail.com> on 2018/05/03 08:24:33 UTC

question on collect_list or say aggregations in general in structured streaming 2.3.0

Hi All,

I was under an assumption that one needs to run grouby(window(...)) to run
any stateful operations but looks like that is not the case since any
aggregation like query

"select count(*) from some_view"  is also stateful since it stores the
result of the count from the previous batch. Likewise, if I do

"select collect_list(*) from some_view" with say maxOffsetsTrigger set to 1
I can see the rows from the previous batch at every trigger.

so is it fair to say aggregations by default are stateful?

I am looking more like DStream like an approach(stateless) where I want to
collect bunch of records on each batch do some aggregation like say count
and throw the result out and next batch it should only count from that
batch only but not from the previous batch.

so If I run "select collect_list(*) from some_view" I want to collect
whatever rows are available at each batch/trigger but not from the previous
batch. How do I do that?

Thanks!

Re: question on collect_list or say aggregations in general in structured streaming 2.3.0

Posted by kant kodali <ka...@gmail.com>.
1) I get an error when I set watermark to 0.
2) I set window and slide interval to 1 second with no watermark. It sill
aggregates messages from the previous batch that are in 1 second window.

so is it fair to say there is no declarative way to do stateless
aggregations?


On Thu, May 3, 2018 at 9:55 AM, Arun Mahadevan <ar...@apache.org> wrote:

> I think you need to group by a window (tumbling) and define watermarks
> (put a very low watermark or even 0) to discard the state. Here the window
> duration becomes your logical batch.
>
> - Arun
>
> From: kant kodali <ka...@gmail.com>
> Date: Thursday, May 3, 2018 at 1:52 AM
> To: "user @spark" <us...@spark.apache.org>
> Subject: Re: question on collect_list or say aggregations in general in
> structured streaming 2.3.0
>
> After doing some more research using Google. It's clear that aggregations
> by default are stateful in Structured Streaming. so the question now is how
> to do stateless aggregations(not storing the result from previous batches)
> using Structured Streaming 2.3.0? I am trying to do it using raw spark SQL
> so not using FlatMapsGroupWithState. And if that is not available then is
> it fair to say there is no declarative way to do stateless aggregations?
>
> On Thu, May 3, 2018 at 1:24 AM, kant kodali <ka...@gmail.com> wrote:
>
>> Hi All,
>>
>> I was under an assumption that one needs to run grouby(window(...)) to
>> run any stateful operations but looks like that is not the case since any
>> aggregation like query
>>
>> "select count(*) from some_view"  is also stateful since it stores the
>> result of the count from the previous batch. Likewise, if I do
>>
>> "select collect_list(*) from some_view" with say maxOffsetsTrigger set to
>> 1 I can see the rows from the previous batch at every trigger.
>>
>> so is it fair to say aggregations by default are stateful?
>>
>> I am looking more like DStream like an approach(stateless) where I want
>> to collect bunch of records on each batch do some aggregation like say
>> count and throw the result out and next batch it should only count from
>> that batch only but not from the previous batch.
>>
>> so If I run "select collect_list(*) from some_view" I want to collect
>> whatever rows are available at each batch/trigger but not from the previous
>> batch. How do I do that?
>>
>> Thanks!
>>
>
>

Re: question on collect_list or say aggregations in general in structured streaming 2.3.0

Posted by Arun Mahadevan <ar...@apache.org>.
I think you need to group by a window (tumbling) and define watermarks (put a very low watermark or even 0) to discard the state. Here the window duration becomes your logical batch.

- Arun

From:  kant kodali <ka...@gmail.com>
Date:  Thursday, May 3, 2018 at 1:52 AM
To:  "user @spark" <us...@spark.apache.org>
Subject:  Re: question on collect_list or say aggregations in general in structured streaming 2.3.0

After doing some more research using Google. It's clear that aggregations by default are stateful in Structured Streaming. so the question now is how to do stateless aggregations(not storing the result from previous batches) using Structured Streaming 2.3.0? I am trying to do it using raw spark SQL so not using FlatMapsGroupWithState. And if that is not available then is it fair to say there is no declarative way to do stateless aggregations?

On Thu, May 3, 2018 at 1:24 AM, kant kodali <ka...@gmail.com> wrote:
Hi All, 

I was under an assumption that one needs to run grouby(window(...)) to run any stateful operations but looks like that is not the case since any aggregation like query

"select count(*) from some_view"  is also stateful since it stores the result of the count from the previous batch. Likewise, if I do 

"select collect_list(*) from some_view" with say maxOffsetsTrigger set to 1 I can see the rows from the previous batch at every trigger. 

so is it fair to say aggregations by default are stateful?

I am looking more like DStream like an approach(stateless) where I want to collect bunch of records on each batch do some aggregation like say count and throw the result out and next batch it should only count from that batch only but not from the previous batch.

so If I run "select collect_list(*) from some_view" I want to collect whatever rows are available at each batch/trigger but not from the previous batch. How do I do that?

Thanks!



Re: question on collect_list or say aggregations in general in structured streaming 2.3.0

Posted by kant kodali <ka...@gmail.com>.
After doing some more research using Google. It's clear that aggregations
by default are stateful in Structured Streaming. so the question now is how
to do stateless aggregations(not storing the result from previous batches)
using Structured Streaming 2.3.0? I am trying to do it using raw spark SQL
so not using FlatMapsGroupWithState. And if that is not available then is
it fair to say there is no declarative way to do stateless aggregations?

On Thu, May 3, 2018 at 1:24 AM, kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> I was under an assumption that one needs to run grouby(window(...)) to run
> any stateful operations but looks like that is not the case since any
> aggregation like query
>
> "select count(*) from some_view"  is also stateful since it stores the
> result of the count from the previous batch. Likewise, if I do
>
> "select collect_list(*) from some_view" with say maxOffsetsTrigger set to
> 1 I can see the rows from the previous batch at every trigger.
>
> so is it fair to say aggregations by default are stateful?
>
> I am looking more like DStream like an approach(stateless) where I want to
> collect bunch of records on each batch do some aggregation like say count
> and throw the result out and next batch it should only count from that
> batch only but not from the previous batch.
>
> so If I run "select collect_list(*) from some_view" I want to collect
> whatever rows are available at each batch/trigger but not from the previous
> batch. How do I do that?
>
> Thanks!
>