You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lec ssmi <sh...@gmail.com> on 2020/02/27 10:30:10 UTC

dropDuplicates and watermark in structured streaming

Hi:
    I'm new to structured streaming. Because the built-in API cannot
perform the Count Distinct operation of Window, I want to use
dropDuplicates first, and then perform the window count.
   But in the process of using, there are two problems:
           1. Because it is streaming computing, in the process of
deduplication, the state needs to be cleared in time, which requires the
cooperation of watermark. Assuming my event time field is consistently
              increasing, and I set the watermark to 1 hour, does it mean
that the data at 10 o'clock will only be compared in these data from 9
o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
           2. Because it is window deduplication, I set the watermark
before deduplication to the window size.But after deduplication, I need to
call withWatermark () again to set the watermark to the real
               watermark. Will setting the watermark again take effect?

     Thanks a lot !

Re: dropDuplicates and watermark in structured streaming

Posted by lec ssmi <sh...@gmail.com>.
  Such as :
        df.withWarmark("time","window
size").dropDulplicates("id").withWatermark("time","real
watermark").groupBy(window("time","window size","window
size")).agg(count("id"))....
   can It  make count(distinct id) success?


lec ssmi <sh...@gmail.com> 于2020年2月28日周五 下午1:11写道:

>   Such as :
>         df.withWarmark("time","window
> size").dropDulplicates("id").withWatermark("time","real
> watermark").groupBy(window("time","window size","window
> size")).agg(count("id"))....
>    can It  make count(distinct count) success?
>
> Tathagata Das <ta...@gmail.com> 于2020年2月28日周五 上午10:25写道:
>
>> 1. Yes. All times in event time, not processing time. So you may get 10AM
>> event time data at 11AM processing time, but it will still be compared
>> again all data within 9-10AM event times.
>>
>> 2. Show us your code.
>>
>> On Thu, Feb 27, 2020 at 2:30 AM lec ssmi <sh...@gmail.com> wrote:
>>
>>> Hi:
>>>     I'm new to structured streaming. Because the built-in API cannot
>>> perform the Count Distinct operation of Window, I want to use
>>> dropDuplicates first, and then perform the window count.
>>>    But in the process of using, there are two problems:
>>>            1. Because it is streaming computing, in the process of
>>> deduplication, the state needs to be cleared in time, which requires the
>>> cooperation of watermark. Assuming my event time field is consistently
>>>               increasing, and I set the watermark to 1 hour, does it
>>> mean that the data at 10 o'clock will only be compared in these data from 9
>>> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>>>            2. Because it is window deduplication, I set the watermark
>>> before deduplication to the window size.But after deduplication, I need to
>>> call withWatermark () again to set the watermark to the real
>>>                watermark. Will setting the watermark again take effect?
>>>
>>>      Thanks a lot !
>>>
>>

Re: dropDuplicates and watermark in structured streaming

Posted by Tathagata Das <ta...@gmail.com>.
why do you have two watermarks? once you apply the watermark to a column
(i.e., "time"), it can be used in all later operations as long as the
column is preserved. So the above code should be equivalent to

df.withWarmark("time","window
size").dropDulplicates("id").groupBy(window("time","window size","window
size")).agg(count("id"))

The right way to think about the watermark threshold is "how late and out
of order my data can be". The answer may be different from the window size
completely. You may want to calculate 10 minutes windows but your data may
come in 5 hour late. So you should define watermark with 5 hour, not 10
minutes.

Btw, on a side note, just so you know, you can use "approx_count_distinct"
if you are okay with some approximation.

On Thu, Feb 27, 2020 at 9:11 PM lec ssmi <sh...@gmail.com> wrote:

>   Such as :
>         df.withWarmark("time","window
> size").dropDulplicates("id").withWatermark("time","real
> watermark").groupBy(window("time","window size","window
> size")).agg(count("id"))....
>    can It  make count(distinct count) success?
>
> Tathagata Das <ta...@gmail.com> 于2020年2月28日周五 上午10:25写道:
>
>> 1. Yes. All times in event time, not processing time. So you may get 10AM
>> event time data at 11AM processing time, but it will still be compared
>> again all data within 9-10AM event times.
>>
>> 2. Show us your code.
>>
>> On Thu, Feb 27, 2020 at 2:30 AM lec ssmi <sh...@gmail.com> wrote:
>>
>>> Hi:
>>>     I'm new to structured streaming. Because the built-in API cannot
>>> perform the Count Distinct operation of Window, I want to use
>>> dropDuplicates first, and then perform the window count.
>>>    But in the process of using, there are two problems:
>>>            1. Because it is streaming computing, in the process of
>>> deduplication, the state needs to be cleared in time, which requires the
>>> cooperation of watermark. Assuming my event time field is consistently
>>>               increasing, and I set the watermark to 1 hour, does it
>>> mean that the data at 10 o'clock will only be compared in these data from 9
>>> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>>>            2. Because it is window deduplication, I set the watermark
>>> before deduplication to the window size.But after deduplication, I need to
>>> call withWatermark () again to set the watermark to the real
>>>                watermark. Will setting the watermark again take effect?
>>>
>>>      Thanks a lot !
>>>
>>

Re: dropDuplicates and watermark in structured streaming

Posted by Tathagata Das <ta...@gmail.com>.
why do you have two watermarks? once you apply the watermark to a column
(i.e., "time"), it can be used in all later operations as long as the
column is preserved. So the above code should be equivalent to

df.withWarmark("time","window
size").dropDulplicates("id").groupBy(window("time","window size","window
size")).agg(count("id"))

The right way to think about the watermark threshold is "how late and out
of order my data can be". The answer may be different from the window size
completely. You may want to calculate 10 minutes windows but your data may
come in 5 hour late. So you should define watermark with 5 hour, not 10
minutes.

Btw, on a side note, just so you know, you can use "approx_count_distinct"
if you are okay with some approximation.

On Thu, Feb 27, 2020 at 9:11 PM lec ssmi <sh...@gmail.com> wrote:

>   Such as :
>         df.withWarmark("time","window
> size").dropDulplicates("id").withWatermark("time","real
> watermark").groupBy(window("time","window size","window
> size")).agg(count("id"))....
>    can It  make count(distinct count) success?
>
> Tathagata Das <ta...@gmail.com> 于2020年2月28日周五 上午10:25写道:
>
>> 1. Yes. All times in event time, not processing time. So you may get 10AM
>> event time data at 11AM processing time, but it will still be compared
>> again all data within 9-10AM event times.
>>
>> 2. Show us your code.
>>
>> On Thu, Feb 27, 2020 at 2:30 AM lec ssmi <sh...@gmail.com> wrote:
>>
>>> Hi:
>>>     I'm new to structured streaming. Because the built-in API cannot
>>> perform the Count Distinct operation of Window, I want to use
>>> dropDuplicates first, and then perform the window count.
>>>    But in the process of using, there are two problems:
>>>            1. Because it is streaming computing, in the process of
>>> deduplication, the state needs to be cleared in time, which requires the
>>> cooperation of watermark. Assuming my event time field is consistently
>>>               increasing, and I set the watermark to 1 hour, does it
>>> mean that the data at 10 o'clock will only be compared in these data from 9
>>> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>>>            2. Because it is window deduplication, I set the watermark
>>> before deduplication to the window size.But after deduplication, I need to
>>> call withWatermark () again to set the watermark to the real
>>>                watermark. Will setting the watermark again take effect?
>>>
>>>      Thanks a lot !
>>>
>>

Re: dropDuplicates and watermark in structured streaming

Posted by lec ssmi <sh...@gmail.com>.
  Such as :
        df.withWarmark("time","window
size").dropDulplicates("id").withWatermark("time","real
watermark").groupBy(window("time","window size","window
size")).agg(count("id"))....
   can It  make count(distinct count) success?

Tathagata Das <ta...@gmail.com> 于2020年2月28日周五 上午10:25写道:

> 1. Yes. All times in event time, not processing time. So you may get 10AM
> event time data at 11AM processing time, but it will still be compared
> again all data within 9-10AM event times.
>
> 2. Show us your code.
>
> On Thu, Feb 27, 2020 at 2:30 AM lec ssmi <sh...@gmail.com> wrote:
>
>> Hi:
>>     I'm new to structured streaming. Because the built-in API cannot
>> perform the Count Distinct operation of Window, I want to use
>> dropDuplicates first, and then perform the window count.
>>    But in the process of using, there are two problems:
>>            1. Because it is streaming computing, in the process of
>> deduplication, the state needs to be cleared in time, which requires the
>> cooperation of watermark. Assuming my event time field is consistently
>>               increasing, and I set the watermark to 1 hour, does it mean
>> that the data at 10 o'clock will only be compared in these data from 9
>> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>>            2. Because it is window deduplication, I set the watermark
>> before deduplication to the window size.But after deduplication, I need to
>> call withWatermark () again to set the watermark to the real
>>                watermark. Will setting the watermark again take effect?
>>
>>      Thanks a lot !
>>
>

Re: dropDuplicates and watermark in structured streaming

Posted by Tathagata Das <ta...@gmail.com>.
1. Yes. All times in event time, not processing time. So you may get 10AM
event time data at 11AM processing time, but it will still be compared
again all data within 9-10AM event times.

2. Show us your code.

On Thu, Feb 27, 2020 at 2:30 AM lec ssmi <sh...@gmail.com> wrote:

> Hi:
>     I'm new to structured streaming. Because the built-in API cannot
> perform the Count Distinct operation of Window, I want to use
> dropDuplicates first, and then perform the window count.
>    But in the process of using, there are two problems:
>            1. Because it is streaming computing, in the process of
> deduplication, the state needs to be cleared in time, which requires the
> cooperation of watermark. Assuming my event time field is consistently
>               increasing, and I set the watermark to 1 hour, does it mean
> that the data at 10 o'clock will only be compared in these data from 9
> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>            2. Because it is window deduplication, I set the watermark
> before deduplication to the window size.But after deduplication, I need to
> call withWatermark () again to set the watermark to the real
>                watermark. Will setting the watermark again take effect?
>
>      Thanks a lot !
>

Re: dropDuplicates and watermark in structured streaming

Posted by Tathagata Das <ta...@gmail.com>.
1. Yes. All times in event time, not processing time. So you may get 10AM
event time data at 11AM processing time, but it will still be compared
again all data within 9-10AM event times.

2. Show us your code.

On Thu, Feb 27, 2020 at 2:30 AM lec ssmi <sh...@gmail.com> wrote:

> Hi:
>     I'm new to structured streaming. Because the built-in API cannot
> perform the Count Distinct operation of Window, I want to use
> dropDuplicates first, and then perform the window count.
>    But in the process of using, there are two problems:
>            1. Because it is streaming computing, in the process of
> deduplication, the state needs to be cleared in time, which requires the
> cooperation of watermark. Assuming my event time field is consistently
>               increasing, and I set the watermark to 1 hour, does it mean
> that the data at 10 o'clock will only be compared in these data from 9
> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>            2. Because it is window deduplication, I set the watermark
> before deduplication to the window size.But after deduplication, I need to
> call withWatermark () again to set the watermark to the real
>                watermark. Will setting the watermark again take effect?
>
>      Thanks a lot !
>

Unsubscribe

Posted by Phillip Pienaar <ph...@gmail.com>.
On Thu, 27 Feb 2020, 9:30 pm lec ssmi, <sh...@gmail.com> wrote:

> Hi:
>     I'm new to structured streaming. Because the built-in API cannot
> perform the Count Distinct operation of Window, I want to use
> dropDuplicates first, and then perform the window count.
>    But in the process of using, there are two problems:
>            1. Because it is streaming computing, in the process of
> deduplication, the state needs to be cleared in time, which requires the
> cooperation of watermark. Assuming my event time field is consistently
>               increasing, and I set the watermark to 1 hour, does it mean
> that the data at 10 o'clock will only be compared in these data from 9
> o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
>            2. Because it is window deduplication, I set the watermark
> before deduplication to the window size.But after deduplication, I need to
> call withWatermark () again to set the watermark to the real
>                watermark. Will setting the watermark again take effect?
>
>      Thanks a lot !
>