You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ayan guha <gu...@gmail.com> on 2018/01/29 07:25:44 UTC

mapGroupsWithState in Python

Hi

I want to write something in Structured streaming:

1. I have a dataset which has 3 columns: id, last_update_timestamp,
attribute
2. I am receiving the data through Kinesis

I want to deduplicate records based on last_updated. In batch, it looks
like:

spark.sql("select * from (Select *, row_number() OVER(Partition by id order
by last_updated desc) rank  from table1) tmp where rank =1")

But now I would like to do it in Structured Stream. I need to maintain the
state of id as per the highest last_updated, across the triggers, for a
certain period (24 hours).

Questions:

1. Should I use mapGroupsWithState or is there any other (SQL?) solution?
Can anyone help me to write it?
2. Is mapGroupsWithState supported in Python?

 Just to ensure we cover bases, I have already tried using dropDuplicates,
but it is keeping the 1st record encountered for an Id, not updating the
state:

unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id
').alias('id'),
                          get_json_object(unpackedDF.jsonData,
'$.header.last_updated').cast('timestamp').alias('last_updated'),
                          unpackedDF.jsonData)

dedupDF =
dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
hours')


So it is not working. Any help is appreciated.

-- 
Best Regards,
Ayan Guha

Re: mapGroupsWithState in Python

Posted by ayan guha <gu...@gmail.com>.
Thanks a lot TD, exactly what I was looking for. And I have seen most of
your talks, really great stuff you guys are doing :)

On Thu, Feb 1, 2018 at 10:38 AM, Tathagata Das <ta...@gmail.com>
wrote:

> Hello Ayan,
>
> From what I understand, mapGroupsWithState (probably the more general
> flatMapGroupsWithState) is the best way forward (not available in python).
> However, you need to figure out your desired semantics of when you want to
> output the deduplicated data from the stremaing query. For example, if
> there is the following sequence of events
>
> (id, last_update_timestamp, attribute)
> 1, 12:00, A      <---- do you want to output this immediately or wait for
> sometime to see if there are new data?
> 1, 11:59, B      <---- ignored as duplicate
> 1, 12:01, C     <---- do you want to output this?
> 1, 12:02, D
>
> If you want to output something every time there is a newer last_update_timestamp,
> then thats not really a strict "deduplication". Its more like aggregation
> with keeping the latest. In that case, you can try using UDAFs as well.
> However, with UDAFs you wont get any state cleanup. So the
> flatMapGroupsWithState is the best solution as you can do whatever tracking
> you want, output whenever you want, and get state cleanup using timeouts.
>
> FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk -
> https://databricks.com/session/deep-dive-into-
> stateful-stream-processing-in-structured-streaming
>
>
>
>
>
>
>
> On Tue, Jan 30, 2018 at 5:14 AM, ayan guha <gu...@gmail.com> wrote:
>
>> Any help would be much appreciated :)
>>
>> On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I want to write something in Structured streaming:
>>>
>>> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
>>> attribute
>>> 2. I am receiving the data through Kinesis
>>>
>>> I want to deduplicate records based on last_updated. In batch, it looks
>>> like:
>>>
>>> spark.sql("select * from (Select *, row_number() OVER(Partition by id
>>> order by last_updated desc) rank  from table1) tmp where rank =1")
>>>
>>> But now I would like to do it in Structured Stream. I need to maintain
>>> the state of id as per the highest last_updated, across the triggers, for a
>>> certain period (24 hours).
>>>
>>> Questions:
>>>
>>> 1. Should I use mapGroupsWithState or is there any other (SQL?)
>>> solution? Can anyone help me to write it?
>>> 2. Is mapGroupsWithState supported in Python?
>>>
>>>  Just to ensure we cover bases, I have already tried using
>>> dropDuplicates, but it is keeping the 1st record encountered for an Id, not
>>> updating the state:
>>>
>>> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
>>> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
>>> header.id').alias('id'),
>>>                           get_json_object(unpackedDF.jsonData,
>>> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>>>                           unpackedDF.jsonData)
>>>
>>> dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
>>> hours')
>>>
>>>
>>> So it is not working. Any help is appreciated.
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Re: mapGroupsWithState in Python

Posted by Tathagata Das <ta...@gmail.com>.
Hello Ayan,

From what I understand, mapGroupsWithState (probably the more general
flatMapGroupsWithState) is the best way forward (not available in python).
However, you need to figure out your desired semantics of when you want to
output the deduplicated data from the stremaing query. For example, if
there is the following sequence of events

(id, last_update_timestamp, attribute)
1, 12:00, A      <---- do you want to output this immediately or wait for
sometime to see if there are new data?
1, 11:59, B      <---- ignored as duplicate
1, 12:01, C     <---- do you want to output this?
1, 12:02, D

If you want to output something every time there is a newer
last_update_timestamp,
then thats not really a strict "deduplication". Its more like aggregation
with keeping the latest. In that case, you can try using UDAFs as well.
However, with UDAFs you wont get any state cleanup. So the
flatMapGroupsWithState is the best solution as you can do whatever tracking
you want, output whenever you want, and get state cleanup using timeouts.

FYI: i have elaborated on flatMapGroupsWithState and timeouts in my talk -
https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming







On Tue, Jan 30, 2018 at 5:14 AM, ayan guha <gu...@gmail.com> wrote:

> Any help would be much appreciated :)
>
> On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>
>> Hi
>>
>> I want to write something in Structured streaming:
>>
>> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
>> attribute
>> 2. I am receiving the data through Kinesis
>>
>> I want to deduplicate records based on last_updated. In batch, it looks
>> like:
>>
>> spark.sql("select * from (Select *, row_number() OVER(Partition by id
>> order by last_updated desc) rank  from table1) tmp where rank =1")
>>
>> But now I would like to do it in Structured Stream. I need to maintain
>> the state of id as per the highest last_updated, across the triggers, for a
>> certain period (24 hours).
>>
>> Questions:
>>
>> 1. Should I use mapGroupsWithState or is there any other (SQL?)
>> solution? Can anyone help me to write it?
>> 2. Is mapGroupsWithState supported in Python?
>>
>>  Just to ensure we cover bases, I have already tried using
>> dropDuplicates, but it is keeping the 1st record encountered for an Id, not
>> updating the state:
>>
>> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
>> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
>> header.id').alias('id'),
>>                           get_json_object(unpackedDF.jsonData,
>> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>>                           unpackedDF.jsonData)
>>
>> dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
>> hours')
>>
>>
>> So it is not working. Any help is appreciated.
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: mapGroupsWithState in Python

Posted by ayan guha <gu...@gmail.com>.
Any help would be much appreciated :)

On Mon, Jan 29, 2018 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:

> Hi
>
> I want to write something in Structured streaming:
>
> 1. I have a dataset which has 3 columns: id, last_update_timestamp,
> attribute
> 2. I am receiving the data through Kinesis
>
> I want to deduplicate records based on last_updated. In batch, it looks
> like:
>
> spark.sql("select * from (Select *, row_number() OVER(Partition by id
> order by last_updated desc) rank  from table1) tmp where rank =1")
>
> But now I would like to do it in Structured Stream. I need to maintain the
> state of id as per the highest last_updated, across the triggers, for a
> certain period (24 hours).
>
> Questions:
>
> 1. Should I use mapGroupsWithState or is there any other (SQL?) solution?
> Can anyone help me to write it?
> 2. Is mapGroupsWithState supported in Python?
>
>  Just to ensure we cover bases, I have already tried using dropDuplicates,
> but it is keeping the 1st record encountered for an Id, not updating the
> state:
>
> unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
> dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.
> header.id').alias('id'),
>                           get_json_object(unpackedDF.jsonData,
> '$.header.last_updated').cast('timestamp').alias('last_updated'),
>                           unpackedDF.jsonData)
>
> dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
> hours')
>
>
> So it is not working. Any help is appreciated.
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha