You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Henri Heiskanen <he...@gmail.com> on 2018/02/06 10:11:33 UTC

deduplication with streaming sql

Hi,

I have a use case where I would like to find distinct rows over certain
period of time. Requirement is that new row is emitted asap. Otherwise the
requirement is mainly to just filter out data to have smaller dataset for
downstream. I noticed that SELECT DISTINCT and state retention time of 12
hours would in theory do the trick. You can find the code below. Few
questions.

1) Why is SELECT DISTINCT creating a retract stream? In which scenarios we
would get update/delete rows?

2) If I run the below code with the example data (also below) without state
retention config I get the two append rows (expected). If I run exactly the
code below (with the retention config) I'll get two appends and one delete
for AN1234 and then one append for AN5555. What is going on?

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

StreamQueryConfig qConfig = tableEnv.queryConfig();
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12));

// create a TableSource
CsvTableSource csvSource = CsvTableSource
.builder()
.path("data.csv")
.field("ts", Types.SQL_TIMESTAMP())
.field("aid1", Types.STRING())
.field("aid2", Types.STRING())
.field("advertiser_id", Types.STRING())
.field("platform_id", Types.STRING())
.fieldDelimiter(",")
.build();

tableEnv.registerTableSource("CsvTable", csvSource);

Table result = tableEnv.sqlQuery(
"SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");

StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING()});

result.writeToSink(out, qConfig);

env.execute();


Here is a simple csv dataset of three rows:

2018-01-31
12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
2018-01-31
12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
2018-01-31
12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1234,1234567891

Re: deduplication with streaming sql

Posted by Henri Heiskanen <he...@gmail.com>.
Hi,

Oh, right.. got it. Thanks!

Br,
Henkka

On Tue, Feb 6, 2018 at 5:01 PM, Fabian Hueske <fh...@apache.org> wrote:

> Hi Henkka,
>
> This should be fairly easy to implement in a ProcessFunction.
> You're making a good call to worry about the number of timers. If you
> register a timer multiple times on the same time, the timer is deduplicated
> ;-) and will only fire once for that time.
> That's why the state retention time allows to set a min and max timer.
> With that, you only have to set a timer every (max - min) interval. For
> example, if you say, the application should keep the state at least for 12
> hours but the most for 14 hours, you only need to register a new timer
> every 2 hours.
>
> Hope this helps,
> Fabian
>
> 2018-02-06 15:47 GMT+01:00 Henri Heiskanen <he...@gmail.com>:
>
>> Hi,
>>
>> Thanks.
>>
>> Doing this deduplication would be easy just by using vanilla flink api
>> and state (check if this is a new key and then emit), but the issue has
>> been automatic state cleanup. However, it looks like this streaming sql
>> retention time implementation uses the process function and timer. I was a
>> bit reluctant to use that because I was worried that the approach would be
>> overkill with our volumes, but maybe it will work just fine. Can you help
>> me a bit how to implement it efficiently?
>>
>> Basically we get estimated of 20M of distinct rows/key and roughly 300
>> events per key during one day. What I would like to do is to clear the
>> state for specific key if I have not seen such key for last 12 hours. I
>> think its very close to example here: https://ci.apache.org/pr
>> ojects/flink/flink-docs-release-1.4/dev/stream/operators/
>> process_function.html. Instead of emitting the data onTimer I would just
>> clear the state. In the example each tuple will invoke
>> registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we
>> could get hundreds of events with the same key during few minutes, so would
>> we then register hundreds of timer instances?
>>
>> Br,
>> Henkka
>>
>> On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <fh...@apache.org> wrote:
>>
>>> Hi Henri,
>>>
>>> thanks for reaching out and providing code and data to reproduce the
>>> issue.
>>>
>>> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
>>> result in a retraction stream.
>>>
>>> However, with the current implementation we internally need a retraction
>>> stream if a state retention time is configured.
>>> The reason lies in how state retention time is defined: the state
>>> retention time will remove the state for a key if it hasn't been seen for x
>>> time.
>>> This means that an operator resets a state clean-up timer of a key
>>> whenever a new record with that key is received. This is also true for
>>> retraction / insertion messages of the same record.
>>> If we implement the GroupBy that performs the DISTINCT as an operator
>>> that emits an append stream, all downstream operator won't see any updates
>>> because the GroupBy only emits the first and filters out all duplicates.
>>> Hence, downstream operators would perform a clean-up too early.
>>>
>>> I see that these are internals that users should not need to worry
>>> about, but right now there is no easy solution to this.
>>> Eventually, the clean-up timer reset should be differently implemented
>>> than using retraction and insert of the same record. However, this would be
>>> a more involved change and requires good planning.
>>>
>>> I'll file a JIRA for that.
>>>
>>> Thanks again for bringing the issue to our attention.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2018-02-06 13:59 GMT+01:00 Timo Walther <tw...@apache.org>:
>>>
>>>> Hi Henri,
>>>>
>>>> I just noticed that I had a tiny mistake in my little test program. So
>>>> SELECT DISTINCT is officially supported. But the question if this is a
>>>> valid append stream is still up for discussion. I will loop in Fabian (in
>>>> CC).
>>>>
>>>> For the general behavior you can also look into the code and especially
>>>> the comments there [1].
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1] https://github.com/apache/flink/blob/master/flink-libraries/
>>>> flink-table/src/main/scala/org/apache/flink/table/runtime/ag
>>>> gregate/GroupAggProcessFunction.scala
>>>>
>>>>
>>>> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>>>>
>>>> Hi Henri,
>>>>
>>>> I try to answer your question:
>>>>
>>>> 1) You are right, SELECT DISTINCT should not need a retract stream.
>>>> Internally, this is translated into an aggregation without an aggregate
>>>> function call. So this definitely needs improvement.
>>>>
>>>> 2) The problem is that SELECT DISTINCT is not officially supported nor
>>>> tested. I opened an issue for this [1].
>>>>
>>>> Until this issue is fixed I would recommend to implement a custom
>>>> aggregate function that keeps track values seen so far [2].
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-8564
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>> dev/table/udfs.html#aggregation-functions
>>>>
>>>>
>>>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>>>>
>>>> Hi,
>>>>
>>>> I have a use case where I would like to find distinct rows over certain
>>>> period of time. Requirement is that new row is emitted asap. Otherwise the
>>>> requirement is mainly to just filter out data to have smaller dataset for
>>>> downstream. I noticed that SELECT DISTINCT and state retention time of 12
>>>> hours would in theory do the trick. You can find the code below. Few
>>>> questions.
>>>>
>>>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios
>>>> we would get update/delete rows?
>>>>
>>>> 2) If I run the below code with the example data (also below) without
>>>> state retention config I get the two append rows (expected). If I run
>>>> exactly the code below (with the retention config) I'll get two appends and
>>>> one delete for AN1234 and then one append for AN5555. What is going on?
>>>>
>>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>> ExecutionEnvironment();
>>>>
>>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>>>> onment(env);
>>>>
>>>> StreamQueryConfig qConfig = tableEnv.queryConfig();
>>>> // set idle state retention time. min = max = 12 hours
>>>> qConfig.withIdleStateRetentionTime(Time.hours(12));
>>>>
>>>> // create a TableSource
>>>> CsvTableSource csvSource = CsvTableSource
>>>> .builder()
>>>> .path("data.csv")
>>>> .field("ts", Types.SQL_TIMESTAMP())
>>>> .field("aid1", Types.STRING())
>>>> .field("aid2", Types.STRING())
>>>> .field("advertiser_id", Types.STRING())
>>>> .field("platform_id", Types.STRING())
>>>> .fieldDelimiter(",")
>>>> .build();
>>>>
>>>> tableEnv.registerTableSource("CsvTable", csvSource);
>>>>
>>>> Table result = tableEnv.sqlQuery(
>>>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>>>>
>>>> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
>>>> String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
>>>> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
>>>> Types.STRING()});
>>>>
>>>> result.writeToSink(out, qConfig);
>>>>
>>>> env.execute();
>>>>
>>>>
>>>> Here is a simple csv dataset of three rows:
>>>>
>>>> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1
>>>> 234,1234567890
>>>> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1
>>>> 234,1234567890
>>>> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1
>>>> 234,1234567891
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: deduplication with streaming sql

Posted by Fabian Hueske <fh...@apache.org>.
Hi Henkka,

This should be fairly easy to implement in a ProcessFunction.
You're making a good call to worry about the number of timers. If you
register a timer multiple times on the same time, the timer is deduplicated
;-) and will only fire once for that time.
That's why the state retention time allows to set a min and max timer. With
that, you only have to set a timer every (max - min) interval. For example,
if you say, the application should keep the state at least for 12 hours but
the most for 14 hours, you only need to register a new timer every 2 hours.

Hope this helps,
Fabian

2018-02-06 15:47 GMT+01:00 Henri Heiskanen <he...@gmail.com>:

> Hi,
>
> Thanks.
>
> Doing this deduplication would be easy just by using vanilla flink api and
> state (check if this is a new key and then emit), but the issue has been
> automatic state cleanup. However, it looks like this streaming sql
> retention time implementation uses the process function and timer. I was a
> bit reluctant to use that because I was worried that the approach would be
> overkill with our volumes, but maybe it will work just fine. Can you help
> me a bit how to implement it efficiently?
>
> Basically we get estimated of 20M of distinct rows/key and roughly 300
> events per key during one day. What I would like to do is to clear the
> state for specific key if I have not seen such key for last 12 hours. I
> think its very close to example here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/stream/
> operators/process_function.html. Instead of emitting the data onTimer I
> would just clear the state. In the example each tuple will invoke
> registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we
> could get hundreds of events with the same key during few minutes, so would
> we then register hundreds of timer instances?
>
> Br,
> Henkka
>
> On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <fh...@apache.org> wrote:
>
>> Hi Henri,
>>
>> thanks for reaching out and providing code and data to reproduce the
>> issue.
>>
>> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
>> result in a retraction stream.
>>
>> However, with the current implementation we internally need a retraction
>> stream if a state retention time is configured.
>> The reason lies in how state retention time is defined: the state
>> retention time will remove the state for a key if it hasn't been seen for x
>> time.
>> This means that an operator resets a state clean-up timer of a key
>> whenever a new record with that key is received. This is also true for
>> retraction / insertion messages of the same record.
>> If we implement the GroupBy that performs the DISTINCT as an operator
>> that emits an append stream, all downstream operator won't see any updates
>> because the GroupBy only emits the first and filters out all duplicates.
>> Hence, downstream operators would perform a clean-up too early.
>>
>> I see that these are internals that users should not need to worry about,
>> but right now there is no easy solution to this.
>> Eventually, the clean-up timer reset should be differently implemented
>> than using retraction and insert of the same record. However, this would be
>> a more involved change and requires good planning.
>>
>> I'll file a JIRA for that.
>>
>> Thanks again for bringing the issue to our attention.
>>
>> Best, Fabian
>>
>>
>> 2018-02-06 13:59 GMT+01:00 Timo Walther <tw...@apache.org>:
>>
>>> Hi Henri,
>>>
>>> I just noticed that I had a tiny mistake in my little test program. So
>>> SELECT DISTINCT is officially supported. But the question if this is a
>>> valid append stream is still up for discussion. I will loop in Fabian (in
>>> CC).
>>>
>>> For the general behavior you can also look into the code and especially
>>> the comments there [1].
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://github.com/apache/flink/blob/master/flink-libraries/
>>> flink-table/src/main/scala/org/apache/flink/table/runtime/
>>> aggregate/GroupAggProcessFunction.scala
>>>
>>>
>>> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>>>
>>> Hi Henri,
>>>
>>> I try to answer your question:
>>>
>>> 1) You are right, SELECT DISTINCT should not need a retract stream.
>>> Internally, this is translated into an aggregation without an aggregate
>>> function call. So this definitely needs improvement.
>>>
>>> 2) The problem is that SELECT DISTINCT is not officially supported nor
>>> tested. I opened an issue for this [1].
>>>
>>> Until this issue is fixed I would recommend to implement a custom
>>> aggregate function that keeps track values seen so far [2].
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-8564
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/table/udfs.html#aggregation-functions
>>>
>>>
>>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>>>
>>> Hi,
>>>
>>> I have a use case where I would like to find distinct rows over certain
>>> period of time. Requirement is that new row is emitted asap. Otherwise the
>>> requirement is mainly to just filter out data to have smaller dataset for
>>> downstream. I noticed that SELECT DISTINCT and state retention time of 12
>>> hours would in theory do the trick. You can find the code below. Few
>>> questions.
>>>
>>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios
>>> we would get update/delete rows?
>>>
>>> 2) If I run the below code with the example data (also below) without
>>> state retention config I get the two append rows (expected). If I run
>>> exactly the code below (with the retention config) I'll get two appends and
>>> one delete for AN1234 and then one append for AN5555. What is going on?
>>>
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>>> onment(env);
>>>
>>> StreamQueryConfig qConfig = tableEnv.queryConfig();
>>> // set idle state retention time. min = max = 12 hours
>>> qConfig.withIdleStateRetentionTime(Time.hours(12));
>>>
>>> // create a TableSource
>>> CsvTableSource csvSource = CsvTableSource
>>> .builder()
>>> .path("data.csv")
>>> .field("ts", Types.SQL_TIMESTAMP())
>>> .field("aid1", Types.STRING())
>>> .field("aid2", Types.STRING())
>>> .field("advertiser_id", Types.STRING())
>>> .field("platform_id", Types.STRING())
>>> .fieldDelimiter(",")
>>> .build();
>>>
>>> tableEnv.registerTableSource("CsvTable", csvSource);
>>>
>>> Table result = tableEnv.sqlQuery(
>>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>>>
>>> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
>>> String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
>>> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
>>> Types.STRING()});
>>>
>>> result.writeToSink(out, qConfig);
>>>
>>> env.execute();
>>>
>>>
>>> Here is a simple csv dataset of three rows:
>>>
>>> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1
>>> 234,1234567890
>>> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1
>>> 234,1234567890
>>> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1
>>> 234,1234567891
>>>
>>>
>>>
>>>
>>
>

Re: deduplication with streaming sql

Posted by Henri Heiskanen <he...@gmail.com>.
Hi,

Thanks.

Doing this deduplication would be easy just by using vanilla flink api and
state (check if this is a new key and then emit), but the issue has been
automatic state cleanup. However, it looks like this streaming sql
retention time implementation uses the process function and timer. I was a
bit reluctant to use that because I was worried that the approach would be
overkill with our volumes, but maybe it will work just fine. Can you help
me a bit how to implement it efficiently?

Basically we get estimated of 20M of distinct rows/key and roughly 300
events per key during one day. What I would like to do is to clear the
state for specific key if I have not seen such key for last 12 hours. I
think its very close to example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html.
Instead of emitting the data onTimer I would just clear the state. In the
example each tuple will invoke registerEventTimeTimer(). Is this the
correct pattern? E.g. in our case we could get hundreds of events with the
same key during few minutes, so would we then register hundreds of timer
instances?

Br,
Henkka

On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <fh...@apache.org> wrote:

> Hi Henri,
>
> thanks for reaching out and providing code and data to reproduce the issue.
>
> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
> result in a retraction stream.
>
> However, with the current implementation we internally need a retraction
> stream if a state retention time is configured.
> The reason lies in how state retention time is defined: the state
> retention time will remove the state for a key if it hasn't been seen for x
> time.
> This means that an operator resets a state clean-up timer of a key
> whenever a new record with that key is received. This is also true for
> retraction / insertion messages of the same record.
> If we implement the GroupBy that performs the DISTINCT as an operator that
> emits an append stream, all downstream operator won't see any updates
> because the GroupBy only emits the first and filters out all duplicates.
> Hence, downstream operators would perform a clean-up too early.
>
> I see that these are internals that users should not need to worry about,
> but right now there is no easy solution to this.
> Eventually, the clean-up timer reset should be differently implemented
> than using retraction and insert of the same record. However, this would be
> a more involved change and requires good planning.
>
> I'll file a JIRA for that.
>
> Thanks again for bringing the issue to our attention.
>
> Best, Fabian
>
>
> 2018-02-06 13:59 GMT+01:00 Timo Walther <tw...@apache.org>:
>
>> Hi Henri,
>>
>> I just noticed that I had a tiny mistake in my little test program. So
>> SELECT DISTINCT is officially supported. But the question if this is a
>> valid append stream is still up for discussion. I will loop in Fabian (in
>> CC).
>>
>> For the general behavior you can also look into the code and especially
>> the comments there [1].
>>
>> Regards,
>> Timo
>>
>> [1] https://github.com/apache/flink/blob/master/flink-libraries/
>> flink-table/src/main/scala/org/apache/flink/table/
>> runtime/aggregate/GroupAggProcessFunction.scala
>>
>>
>> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>>
>> Hi Henri,
>>
>> I try to answer your question:
>>
>> 1) You are right, SELECT DISTINCT should not need a retract stream.
>> Internally, this is translated into an aggregation without an aggregate
>> function call. So this definitely needs improvement.
>>
>> 2) The problem is that SELECT DISTINCT is not officially supported nor
>> tested. I opened an issue for this [1].
>>
>> Until this issue is fixed I would recommend to implement a custom
>> aggregate function that keeps track values seen so far [2].
>>
>> Regards,
>> Timo
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8564
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/udfs.html#aggregation-functions
>>
>>
>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>>
>> Hi,
>>
>> I have a use case where I would like to find distinct rows over certain
>> period of time. Requirement is that new row is emitted asap. Otherwise the
>> requirement is mainly to just filter out data to have smaller dataset for
>> downstream. I noticed that SELECT DISTINCT and state retention time of 12
>> hours would in theory do the trick. You can find the code below. Few
>> questions.
>>
>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios
>> we would get update/delete rows?
>>
>> 2) If I run the below code with the example data (also below) without
>> state retention config I get the two append rows (expected). If I run
>> exactly the code below (with the retention config) I'll get two appends and
>> one delete for AN1234 and then one append for AN5555. What is going on?
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>
>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>> onment(env);
>>
>> StreamQueryConfig qConfig = tableEnv.queryConfig();
>> // set idle state retention time. min = max = 12 hours
>> qConfig.withIdleStateRetentionTime(Time.hours(12));
>>
>> // create a TableSource
>> CsvTableSource csvSource = CsvTableSource
>> .builder()
>> .path("data.csv")
>> .field("ts", Types.SQL_TIMESTAMP())
>> .field("aid1", Types.STRING())
>> .field("aid2", Types.STRING())
>> .field("advertiser_id", Types.STRING())
>> .field("platform_id", Types.STRING())
>> .fieldDelimiter(",")
>> .build();
>>
>> tableEnv.registerTableSource("CsvTable", csvSource);
>>
>> Table result = tableEnv.sqlQuery(
>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>>
>> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
>> String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
>> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING()});
>>
>> result.writeToSink(out, qConfig);
>>
>> env.execute();
>>
>>
>> Here is a simple csv dataset of three rows:
>>
>> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-
>> 1234,1234567890
>> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-
>> 1234,1234567890
>> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-
>> 1234,1234567891
>>
>>
>>
>>
>

Re: deduplication with streaming sql

Posted by Fabian Hueske <fh...@apache.org>.
Hi Henri,

thanks for reaching out and providing code and data to reproduce the issue.

I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
result in a retraction stream.

However, with the current implementation we internally need a retraction
stream if a state retention time is configured.
The reason lies in how state retention time is defined: the state retention
time will remove the state for a key if it hasn't been seen for x time.
This means that an operator resets a state clean-up timer of a key whenever
a new record with that key is received. This is also true for retraction /
insertion messages of the same record.
If we implement the GroupBy that performs the DISTINCT as an operator that
emits an append stream, all downstream operator won't see any updates
because the GroupBy only emits the first and filters out all duplicates.
Hence, downstream operators would perform a clean-up too early.

I see that these are internals that users should not need to worry about,
but right now there is no easy solution to this.
Eventually, the clean-up timer reset should be differently implemented than
using retraction and insert of the same record. However, this would be a
more involved change and requires good planning.

I'll file a JIRA for that.

Thanks again for bringing the issue to our attention.

Best, Fabian


2018-02-06 13:59 GMT+01:00 Timo Walther <tw...@apache.org>:

> Hi Henri,
>
> I just noticed that I had a tiny mistake in my little test program. So
> SELECT DISTINCT is officially supported. But the question if this is a
> valid append stream is still up for discussion. I will loop in Fabian (in
> CC).
>
> For the general behavior you can also look into the code and especially
> the comments there [1].
>
> Regards,
> Timo
>
> [1] https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/runtime/aggregate/GroupAggProcessFunction.scala
>
>
> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>
> Hi Henri,
>
> I try to answer your question:
>
> 1) You are right, SELECT DISTINCT should not need a retract stream.
> Internally, this is translated into an aggregation without an aggregate
> function call. So this definitely needs improvement.
>
> 2) The problem is that SELECT DISTINCT is not officially supported nor
> tested. I opened an issue for this [1].
>
> Until this issue is fixed I would recommend to implement a custom
> aggregate function that keeps track values seen so far [2].
>
> Regards,
> Timo
>
> [1] https://issues.apache.org/jira/browse/FLINK-8564
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/table/udfs.html#aggregation-functions
>
>
> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>
> Hi,
>
> I have a use case where I would like to find distinct rows over certain
> period of time. Requirement is that new row is emitted asap. Otherwise the
> requirement is mainly to just filter out data to have smaller dataset for
> downstream. I noticed that SELECT DISTINCT and state retention time of 12
> hours would in theory do the trick. You can find the code below. Few
> questions.
>
> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios we
> would get update/delete rows?
>
> 2) If I run the below code with the example data (also below) without
> state retention config I get the two append rows (expected). If I run
> exactly the code below (with the retention config) I'll get two appends and
> one delete for AN1234 and then one append for AN5555. What is going on?
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> StreamTableEnvironment tableEnv = TableEnvironment.
> getTableEnvironment(env);
>
> StreamQueryConfig qConfig = tableEnv.queryConfig();
> // set idle state retention time. min = max = 12 hours
> qConfig.withIdleStateRetentionTime(Time.hours(12));
>
> // create a TableSource
> CsvTableSource csvSource = CsvTableSource
> .builder()
> .path("data.csv")
> .field("ts", Types.SQL_TIMESTAMP())
> .field("aid1", Types.STRING())
> .field("aid2", Types.STRING())
> .field("advertiser_id", Types.STRING())
> .field("platform_id", Types.STRING())
> .fieldDelimiter(",")
> .build();
>
> tableEnv.registerTableSource("CsvTable", csvSource);
>
> Table result = tableEnv.sqlQuery(
> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>
> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
> String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
> Types.STRING()});
>
> result.writeToSink(out, qConfig);
>
> env.execute();
>
>
> Here is a simple csv dataset of three rows:
>
> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-
> 1234-1234,1234567890
> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-
> 1234-1234,1234567890
> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-
> 1234-1234,1234567891
>
>
>
>

Re: deduplication with streaming sql

Posted by Timo Walther <tw...@apache.org>.
Hi Henri,

I just noticed that I had a tiny mistake in my little test program. So 
SELECT DISTINCT is officially supported. But the question if this is a 
valid append stream is still up for discussion. I will loop in Fabian 
(in CC).

For the general behavior you can also look into the code and especially 
the comments there [1].

Regards,
Timo

[1] 
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala


Am 2/6/18 um 1:36 PM schrieb Timo Walther:
> Hi Henri,
>
> I try to answer your question:
>
> 1) You are right, SELECT DISTINCT should not need a retract stream. 
> Internally, this is translated into an aggregation without an 
> aggregate function call. So this definitely needs improvement.
>
> 2) The problem is that SELECT DISTINCT is not officially supported nor 
> tested. I opened an issue for this [1].
>
> Until this issue is fixed I would recommend to implement a custom 
> aggregate function that keeps track values seen so far [2].
>
> Regards,
> Timo
>
> [1] https://issues.apache.org/jira/browse/FLINK-8564
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/udfs.html#aggregation-functions
>
>
> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>> Hi,
>>
>> I have a use case where I would like to find distinct rows over 
>> certain period of time. Requirement is that new row is emitted asap. 
>> Otherwise the requirement is mainly to just filter out data to have 
>> smaller dataset for downstream. I noticed that SELECT DISTINCT and 
>> state retention time of 12 hours would in theory do the trick. You 
>> can find the code below. Few questions.
>>
>> 1) Why is SELECT DISTINCT creating a retract stream? In which 
>> scenarios we would get update/delete rows?
>>
>> 2) If I run the below code with the example data (also below) without 
>> state retention config I get the two append rows (expected). If I run 
>> exactly the code below (with the retention config) I'll get two 
>> appends and one delete for AN1234 and then one append for AN5555. 
>> What is going on?
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> StreamTableEnvironment tableEnv = 
>> TableEnvironment.getTableEnvironment(env);
>>
>> StreamQueryConfig qConfig = tableEnv.queryConfig();
>> // set idle state retention time. min = max = 12 hours
>> qConfig.withIdleStateRetentionTime(Time.hours(12));
>>
>> // create a TableSource
>> CsvTableSource csvSource = CsvTableSource
>> .builder()
>> .path("data.csv")
>> .field("ts", Types.SQL_TIMESTAMP())
>> .field("aid1", Types.STRING())
>> .field("aid2", Types.STRING())
>> .field("advertiser_id", Types.STRING())
>> .field("platform_id", Types.STRING())
>> .fieldDelimiter(",")
>> .build();
>>
>> tableEnv.registerTableSource("CsvTable", csvSource);
>>
>> Table result = tableEnv.sqlQuery(
>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>>
>> StdOutRetractStreamTableSink out = new 
>> StdOutRetractStreamTableSink(new String[] {"aid1", "aid2", 
>> "advertiser_id", "platform_id"},
>> new TypeInformation[] {Types.STRING(), Types.STRING(), 
>> Types.STRING(), Types.STRING()});
>>
>> result.writeToSink(out, qConfig);
>>
>> env.execute();
>>
>>
>> Here is a simple csv dataset of three rows:
>>
>> 2018-01-31 
>> 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
>> 2018-01-31 
>> 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
>> 2018-01-31 
>> 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1234,1234567891
>>
>


Re: deduplication with streaming sql

Posted by Timo Walther <tw...@apache.org>.
Hi Henri,

I try to answer your question:

1) You are right, SELECT DISTINCT should not need a retract stream. 
Internally, this is translated into an aggregation without an aggregate 
function call. So this definitely needs improvement.

2) The problem is that SELECT DISTINCT is not officially supported nor 
tested. I opened an issue for this [1].

Until this issue is fixed I would recommend to implement a custom 
aggregate function that keeps track values seen so far [2].

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-8564
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/udfs.html#aggregation-functions


Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
> Hi,
>
> I have a use case where I would like to find distinct rows over 
> certain period of time. Requirement is that new row is emitted asap. 
> Otherwise the requirement is mainly to just filter out data to have 
> smaller dataset for downstream. I noticed that SELECT DISTINCT and 
> state retention time of 12 hours would in theory do the trick. You can 
> find the code below. Few questions.
>
> 1) Why is SELECT DISTINCT creating a retract stream? In which 
> scenarios we would get update/delete rows?
>
> 2) If I run the below code with the example data (also below) without 
> state retention config I get the two append rows (expected). If I run 
> exactly the code below (with the retention config) I'll get two 
> appends and one delete for AN1234 and then one append for AN5555. What 
> is going on?
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
>
> StreamQueryConfig qConfig = tableEnv.queryConfig();
> // set idle state retention time. min = max = 12 hours
> qConfig.withIdleStateRetentionTime(Time.hours(12));
>
> // create a TableSource
> CsvTableSource csvSource = CsvTableSource
> .builder()
> .path("data.csv")
> .field("ts", Types.SQL_TIMESTAMP())
> .field("aid1", Types.STRING())
> .field("aid2", Types.STRING())
> .field("advertiser_id", Types.STRING())
> .field("platform_id", Types.STRING())
> .fieldDelimiter(",")
> .build();
>
> tableEnv.registerTableSource("CsvTable", csvSource);
>
> Table result = tableEnv.sqlQuery(
> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>
> StdOutRetractStreamTableSink out = new 
> StdOutRetractStreamTableSink(new String[] {"aid1", "aid2", 
> "advertiser_id", "platform_id"},
> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(), 
> Types.STRING()});
>
> result.writeToSink(out, qConfig);
>
> env.execute();
>
>
> Here is a simple csv dataset of three rows:
>
> 2018-01-31 
> 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
> 2018-01-31 
> 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1234,1234567890
> 2018-01-31 
> 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1234,1234567891
>