You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vinod Mehra <vm...@lyft.com> on 2019/05/29 20:49:29 UTC

count(DISTINCT) in flink SQL

Hi!

We are using apache-flink-1.4.2. It seems this version doesn't support
count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:

SELECT

    CONCAT_WS(

      '-',

      CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),

      CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),

      CAST(user_id AS VARCHAR)

    ),

    COUNT(*DISTINCT*(event_id)) AS event_count

FROM event_foo

GROUP BY user_id, MONTH(longToDateTime(rowtime)),
YEAR(longToDateTime(rowtime))


(the duplicate events have the same 'event_id' (and user_id), the other
fields e.g. timestamps may or may not be different)


But that failed because DISTINCT is not supported. As a workaround I tried:

SELECT

    CONCAT_WS(

      '-',

      CAST(MONTH(row_datetime) AS VARCHAR),

      CAST(YEAR(row_datetime) AS VARCHAR),

      CAST(user_id AS VARCHAR)

    ),

    COUNT(event_id) AS event_count

FROM (

    SELECT

        user_id,

        event_id,

        maxtimestamp(longToDateTime(rowtime)) as row_datetime

    FROM event_foo

    GROUP BY event_id, user_id

)

GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)

I am hoping the inner SELECT to do the deduping because logically it is
equivalent to a DISTINCT. This works in my functional testing.

Will it also work if the dedups span different event buckets? I was hoping
that as long as the events arrive within the state "retention time" in
flink they should be deduped but I am new to Flink so I am not sure about
that. Can someone please correct me if I am wrong? Is this a reasonable
workaround for lack of DISTINCT support? Please let me know if there is a
better way.

Thanks,
Vinod

Re: count(DISTINCT) in flink SQL

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vinod,

Sorry for the late reply.
Your approach looks good to me.

A few things to note:
* It is not possible to set different idle state retention timers for
different parts of a query. All operators that support idle state retention
use the same configuration.
* The inner query with the SESSION window does not support (and also does
not need) idle state retention timers. SESSION window state is
automatically cleaned up based on watermarks. So, you don't have to worry
about this one.
* If you set the idle state retention time to one month (+some safety
margin) it should not affect the correctness of the results because the
query would maintain the counts for one more month after the last access.
However, this also means that you keep up to two months of state around (up
to two counts for each user).

Cheers,
Fabian

Am Mi., 5. Juni 2019 um 03:07 Uhr schrieb Vinod Mehra <vm...@lyft.com>:

> To be clear I want the outer grouping to have a longer retention time (of
> the order of week or month - for which we are using 'idle state retention
> time') and inner grouping to have a shorter retention period (1 hour max).
> So hoping the session window will do the right thing.
>
> Thanks,
> Vinod
>
> On Tue, Jun 4, 2019 at 5:14 PM Vinod Mehra <vm...@lyft.com> wrote:
>
>> Thanks a lot Fabian for the detailed response. I know all the duplicates
>> are going to arrive within an hour max of the actual event. So using a 1
>> hour running session window should be fine for me.
>>
>> Is the following the right way to do it in apache-flink-1.4.2?
>>
>> SELECT
>>
>>     CONCAT_WS(
>>
>>       '-',
>>
>>       CAST(event_month AS VARCHAR),
>>
>>       CAST(event_year AS VARCHAR),
>>
>>       CAST(user_id AS VARCHAR)
>>
>>     ),
>>
>>     COUNT(event_id) AS event_count
>>
>> FROM (
>>
>>     SELECT
>>
>>         user_id,
>>
>>         event_id,
>>
>>         MAX(MONTH(longToDateTime(rowtime))) as event_month,
>>
>>         MAX(YEAR(longToDateTime(rowtime))) as event_year,
>>
>>     FROM event_foo
>>
>>     GROUP BY event_id, user_id, SESSION(rowtime, INTERVAL '1' HOUR) -- 1
>> hour running session window
>>
>> )
>>
>> GROUP BY user_id, event_month, event_year
>>
>>
>>
>> We are also using idle state retention time to clean up unused state, but
>> that is much longer (a week or month depending on the usecase). We will
>> switch to count(DISTINCT) as soon as we move to newer Flink version. So the
>> above nested select is going to be a stop gap until then.
>>
>> Thanks,
>> Vinod
>>
>> On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Vinod,
>>>
>>> IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released
>>> August, 9th 2018) [1].
>>> Also note that by default, this query will accumulate more and more
>>> state, i.e., for each grouping key it will hold all unique event_ids.
>>> You could configure an idle state retention time to clean up unused
>>> state.
>>>
>>> Regarding the boundaries, with the current query they are fixed to one
>>> month and sharply cut (as one would expect).
>>> You could try to use a long running session window [3]. This would also
>>> remove the need for the idle state configuration because Flink would know
>>> when state can be discarded.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> [1]
>>> https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows
>>>
>>> Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <vm...@lyft.com>:
>>>
>>>> Another interesting thing is that if I add DISTINCT in the 2nd query it
>>>> doesn't complain. But because of the inner-select it is a no-op because the
>>>> inner select is doing the deduping:
>>>>
>>>> SELECT
>>>>
>>>>     CONCAT_WS(
>>>>
>>>>       '-',
>>>>
>>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>>
>>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>>
>>>>       CAST(user_id AS VARCHAR)
>>>>
>>>>     ),
>>>>
>>>>     COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT
>>>> keyword here. Flink doesn't barf for this.
>>>>
>>>> FROM (
>>>>
>>>>     SELECT
>>>>
>>>>         user_id,
>>>>
>>>>         event_id,
>>>>
>>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>>
>>>>     FROM event_foo
>>>>
>>>>     GROUP BY event_id, user_id
>>>>
>>>> )
>>>>
>>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>>
>>>> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vm...@lyft.com> wrote:
>>>>
>>>>> More details on the error with query#1 that used COUNT(DISTINCT()):
>>>>>
>>>>> org.apache.flink.table.api.TableException: Cannot generate a valid
>>>>> execution plan for the given query:
>>>>>
>>>>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
>>>>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>>>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
>>>>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
>>>>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>>>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
>>>>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
>>>>> lower_boundary=[$t3], latency_marker=[$t4])
>>>>>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
>>>>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>>>>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>>>>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>>>>>       FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>>>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>>>>>
>>>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>>>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>>>>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>>>>>       FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>>>>>         FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>>>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])
>>>>>
>>>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>>>>
>>>>> This exception indicates that the query uses an unsupported SQL
>>>>> feature.
>>>>> Please check the documentation for the set of currently supported SQL
>>>>> features.
>>>>>
>>>>> at
>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>>>>> at
>>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
>>>>> at
>>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>>>>> at
>>>>> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vm...@lyft.com> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> We are using apache-flink-1.4.2. It seems this version doesn't
>>>>>> support count(DISTINCT). I am trying to find a way to dedup the stream. So
>>>>>> I tried:
>>>>>>
>>>>>> SELECT
>>>>>>
>>>>>>     CONCAT_WS(
>>>>>>
>>>>>>       '-',
>>>>>>
>>>>>>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>>>>>>
>>>>>>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>>>>>>
>>>>>>       CAST(user_id AS VARCHAR)
>>>>>>
>>>>>>     ),
>>>>>>
>>>>>>     COUNT(*DISTINCT*(event_id)) AS event_count
>>>>>>
>>>>>> FROM event_foo
>>>>>>
>>>>>> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
>>>>>> YEAR(longToDateTime(rowtime))
>>>>>>
>>>>>>
>>>>>> (the duplicate events have the same 'event_id' (and user_id), the
>>>>>> other fields e.g. timestamps may or may not be different)
>>>>>>
>>>>>>
>>>>>> But that failed because DISTINCT is not supported. As a workaround I
>>>>>> tried:
>>>>>>
>>>>>> SELECT
>>>>>>
>>>>>>     CONCAT_WS(
>>>>>>
>>>>>>       '-',
>>>>>>
>>>>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>>>>
>>>>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>>>>
>>>>>>       CAST(user_id AS VARCHAR)
>>>>>>
>>>>>>     ),
>>>>>>
>>>>>>     COUNT(event_id) AS event_count
>>>>>>
>>>>>> FROM (
>>>>>>
>>>>>>     SELECT
>>>>>>
>>>>>>         user_id,
>>>>>>
>>>>>>         event_id,
>>>>>>
>>>>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>>>>
>>>>>>     FROM event_foo
>>>>>>
>>>>>>     GROUP BY event_id, user_id
>>>>>>
>>>>>> )
>>>>>>
>>>>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>>>>
>>>>>> I am hoping the inner SELECT to do the deduping because logically it
>>>>>> is equivalent to a DISTINCT. This works in my functional testing.
>>>>>>
>>>>>> Will it also work if the dedups span different event buckets? I was
>>>>>> hoping that as long as the events arrive within the state "retention time"
>>>>>> in flink they should be deduped but I am new to Flink so I am not sure
>>>>>> about that. Can someone please correct me if I am wrong? Is this a
>>>>>> reasonable workaround for lack of DISTINCT support? Please let me know if
>>>>>> there is a better way.
>>>>>>
>>>>>> Thanks,
>>>>>> Vinod
>>>>>>
>>>>>>

Re: count(DISTINCT) in flink SQL

Posted by Vinod Mehra <vm...@lyft.com>.
To be clear I want the outer grouping to have a longer retention time (of
the order of week or month - for which we are using 'idle state retention
time') and inner grouping to have a shorter retention period (1 hour max).
So hoping the session window will do the right thing.

Thanks,
Vinod

On Tue, Jun 4, 2019 at 5:14 PM Vinod Mehra <vm...@lyft.com> wrote:

> Thanks a lot Fabian for the detailed response. I know all the duplicates
> are going to arrive within an hour max of the actual event. So using a 1
> hour running session window should be fine for me.
>
> Is the following the right way to do it in apache-flink-1.4.2?
>
> SELECT
>
>     CONCAT_WS(
>
>       '-',
>
>       CAST(event_month AS VARCHAR),
>
>       CAST(event_year AS VARCHAR),
>
>       CAST(user_id AS VARCHAR)
>
>     ),
>
>     COUNT(event_id) AS event_count
>
> FROM (
>
>     SELECT
>
>         user_id,
>
>         event_id,
>
>         MAX(MONTH(longToDateTime(rowtime))) as event_month,
>
>         MAX(YEAR(longToDateTime(rowtime))) as event_year,
>
>     FROM event_foo
>
>     GROUP BY event_id, user_id, SESSION(rowtime, INTERVAL '1' HOUR) -- 1
> hour running session window
>
> )
>
> GROUP BY user_id, event_month, event_year
>
>
>
> We are also using idle state retention time to clean up unused state, but
> that is much longer (a week or month depending on the usecase). We will
> switch to count(DISTINCT) as soon as we move to newer Flink version. So the
> above nested select is going to be a stop gap until then.
>
> Thanks,
> Vinod
>
> On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vinod,
>>
>> IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released
>> August, 9th 2018) [1].
>> Also note that by default, this query will accumulate more and more
>> state, i.e., for each grouping key it will hold all unique event_ids.
>> You could configure an idle state retention time to clean up unused state.
>>
>> Regarding the boundaries, with the current query they are fixed to one
>> month and sharply cut (as one would expect).
>> You could try to use a long running session window [3]. This would also
>> remove the need for the idle state configuration because Flink would know
>> when state can be discarded.
>>
>> Hope this helps,
>> Fabian
>>
>> [1]
>> https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows
>>
>> Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <vm...@lyft.com>:
>>
>>> Another interesting thing is that if I add DISTINCT in the 2nd query it
>>> doesn't complain. But because of the inner-select it is a no-op because the
>>> inner select is doing the deduping:
>>>
>>> SELECT
>>>
>>>     CONCAT_WS(
>>>
>>>       '-',
>>>
>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>
>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>
>>>       CAST(user_id AS VARCHAR)
>>>
>>>     ),
>>>
>>>     COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT
>>> keyword here. Flink doesn't barf for this.
>>>
>>> FROM (
>>>
>>>     SELECT
>>>
>>>         user_id,
>>>
>>>         event_id,
>>>
>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>
>>>     FROM event_foo
>>>
>>>     GROUP BY event_id, user_id
>>>
>>> )
>>>
>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>
>>> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vm...@lyft.com> wrote:
>>>
>>>> More details on the error with query#1 that used COUNT(DISTINCT()):
>>>>
>>>> org.apache.flink.table.api.TableException: Cannot generate a valid
>>>> execution plan for the given query:
>>>>
>>>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
>>>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
>>>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
>>>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
>>>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
>>>> lower_boundary=[$t3], latency_marker=[$t4])
>>>>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
>>>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>>>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>>>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>>>>       FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>>>>
>>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>>>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>>>>       FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>>>>         FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])
>>>>
>>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>>>
>>>> This exception indicates that the query uses an unsupported SQL feature.
>>>> Please check the documentation for the set of currently supported SQL
>>>> features.
>>>>
>>>> at
>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>>>> at
>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
>>>> at
>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>>>> at
>>>> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
>>>>
>>>>
>>>>
>>>> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vm...@lyft.com> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> We are using apache-flink-1.4.2. It seems this version doesn't support
>>>>> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
>>>>>
>>>>> SELECT
>>>>>
>>>>>     CONCAT_WS(
>>>>>
>>>>>       '-',
>>>>>
>>>>>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>>>>>
>>>>>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>>>>>
>>>>>       CAST(user_id AS VARCHAR)
>>>>>
>>>>>     ),
>>>>>
>>>>>     COUNT(*DISTINCT*(event_id)) AS event_count
>>>>>
>>>>> FROM event_foo
>>>>>
>>>>> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
>>>>> YEAR(longToDateTime(rowtime))
>>>>>
>>>>>
>>>>> (the duplicate events have the same 'event_id' (and user_id), the
>>>>> other fields e.g. timestamps may or may not be different)
>>>>>
>>>>>
>>>>> But that failed because DISTINCT is not supported. As a workaround I
>>>>> tried:
>>>>>
>>>>> SELECT
>>>>>
>>>>>     CONCAT_WS(
>>>>>
>>>>>       '-',
>>>>>
>>>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>>>
>>>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>>>
>>>>>       CAST(user_id AS VARCHAR)
>>>>>
>>>>>     ),
>>>>>
>>>>>     COUNT(event_id) AS event_count
>>>>>
>>>>> FROM (
>>>>>
>>>>>     SELECT
>>>>>
>>>>>         user_id,
>>>>>
>>>>>         event_id,
>>>>>
>>>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>>>
>>>>>     FROM event_foo
>>>>>
>>>>>     GROUP BY event_id, user_id
>>>>>
>>>>> )
>>>>>
>>>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>>>
>>>>> I am hoping the inner SELECT to do the deduping because logically it
>>>>> is equivalent to a DISTINCT. This works in my functional testing.
>>>>>
>>>>> Will it also work if the dedups span different event buckets? I was
>>>>> hoping that as long as the events arrive within the state "retention time"
>>>>> in flink they should be deduped but I am new to Flink so I am not sure
>>>>> about that. Can someone please correct me if I am wrong? Is this a
>>>>> reasonable workaround for lack of DISTINCT support? Please let me know if
>>>>> there is a better way.
>>>>>
>>>>> Thanks,
>>>>> Vinod
>>>>>
>>>>>

Re: count(DISTINCT) in flink SQL

Posted by Vinod Mehra <vm...@lyft.com>.
Thanks a lot Fabian for the detailed response. I know all the duplicates
are going to arrive within an hour max of the actual event. So using a 1
hour running session window should be fine for me.

Is the following the right way to do it in apache-flink-1.4.2?

SELECT

    CONCAT_WS(

      '-',

      CAST(event_month AS VARCHAR),

      CAST(event_year AS VARCHAR),

      CAST(user_id AS VARCHAR)

    ),

    COUNT(event_id) AS event_count

FROM (

    SELECT

        user_id,

        event_id,

        MAX(MONTH(longToDateTime(rowtime))) as event_month,

        MAX(YEAR(longToDateTime(rowtime))) as event_year,

    FROM event_foo

    GROUP BY event_id, user_id, SESSION(rowtime, INTERVAL '1' HOUR) -- 1
hour running session window

)

GROUP BY user_id, event_month, event_year



We are also using idle state retention time to clean up unused state, but
that is much longer (a week or month depending on the usecase). We will
switch to count(DISTINCT) as soon as we move to newer Flink version. So the
above nested select is going to be a stop gap until then.

Thanks,
Vinod

On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Vinod,
>
> IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released
> August, 9th 2018) [1].
> Also note that by default, this query will accumulate more and more state,
> i.e., for each grouping key it will hold all unique event_ids.
> You could configure an idle state retention time to clean up unused state.
>
> Regarding the boundaries, with the current query they are fixed to one
> month and sharply cut (as one would expect).
> You could try to use a long running session window [3]. This would also
> remove the need for the idle state configuration because Flink would know
> when state can be discarded.
>
> Hope this helps,
> Fabian
>
> [1]
> https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows
>
> Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <vm...@lyft.com>:
>
>> Another interesting thing is that if I add DISTINCT in the 2nd query it
>> doesn't complain. But because of the inner-select it is a no-op because the
>> inner select is doing the deduping:
>>
>> SELECT
>>
>>     CONCAT_WS(
>>
>>       '-',
>>
>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>
>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>
>>       CAST(user_id AS VARCHAR)
>>
>>     ),
>>
>>     COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT
>> keyword here. Flink doesn't barf for this.
>>
>> FROM (
>>
>>     SELECT
>>
>>         user_id,
>>
>>         event_id,
>>
>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>
>>     FROM event_foo
>>
>>     GROUP BY event_id, user_id
>>
>> )
>>
>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>
>> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vm...@lyft.com> wrote:
>>
>>> More details on the error with query#1 that used COUNT(DISTINCT()):
>>>
>>> org.apache.flink.table.api.TableException: Cannot generate a valid
>>> execution plan for the given query:
>>>
>>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
>>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
>>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
>>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
>>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
>>> lower_boundary=[$t3], latency_marker=[$t4])
>>>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
>>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>>>       FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>>>
>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>>>       FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>>>         FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])
>>>
>>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>>
>>> This exception indicates that the query uses an unsupported SQL feature.
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>>
>>> at
>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>>> at
>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
>>> at
>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>>> at
>>> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
>>>
>>>
>>>
>>> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vm...@lyft.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> We are using apache-flink-1.4.2. It seems this version doesn't support
>>>> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
>>>>
>>>> SELECT
>>>>
>>>>     CONCAT_WS(
>>>>
>>>>       '-',
>>>>
>>>>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>>>>
>>>>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>>>>
>>>>       CAST(user_id AS VARCHAR)
>>>>
>>>>     ),
>>>>
>>>>     COUNT(*DISTINCT*(event_id)) AS event_count
>>>>
>>>> FROM event_foo
>>>>
>>>> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
>>>> YEAR(longToDateTime(rowtime))
>>>>
>>>>
>>>> (the duplicate events have the same 'event_id' (and user_id), the other
>>>> fields e.g. timestamps may or may not be different)
>>>>
>>>>
>>>> But that failed because DISTINCT is not supported. As a workaround I
>>>> tried:
>>>>
>>>> SELECT
>>>>
>>>>     CONCAT_WS(
>>>>
>>>>       '-',
>>>>
>>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>>
>>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>>
>>>>       CAST(user_id AS VARCHAR)
>>>>
>>>>     ),
>>>>
>>>>     COUNT(event_id) AS event_count
>>>>
>>>> FROM (
>>>>
>>>>     SELECT
>>>>
>>>>         user_id,
>>>>
>>>>         event_id,
>>>>
>>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>>
>>>>     FROM event_foo
>>>>
>>>>     GROUP BY event_id, user_id
>>>>
>>>> )
>>>>
>>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>>
>>>> I am hoping the inner SELECT to do the deduping because logically it is
>>>> equivalent to a DISTINCT. This works in my functional testing.
>>>>
>>>> Will it also work if the dedups span different event buckets? I was
>>>> hoping that as long as the events arrive within the state "retention time"
>>>> in flink they should be deduped but I am new to Flink so I am not sure
>>>> about that. Can someone please correct me if I am wrong? Is this a
>>>> reasonable workaround for lack of DISTINCT support? Please let me know if
>>>> there is a better way.
>>>>
>>>> Thanks,
>>>> Vinod
>>>>
>>>>

Re: count(DISTINCT) in flink SQL

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vinod,

IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released
August, 9th 2018) [1].
Also note that by default, this query will accumulate more and more state,
i.e., for each grouping key it will hold all unique event_ids.
You could configure an idle state retention time to clean up unused state.

Regarding the boundaries, with the current query they are fixed to one
month and sharply cut (as one would expect).
You could try to use a long running session window [3]. This would also
remove the need for the idle state configuration because Flink would know
when state can be discarded.

Hope this helps,
Fabian

[1]
https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows

Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <vm...@lyft.com>:

> Another interesting thing is that if I add DISTINCT in the 2nd query it
> doesn't complain. But because of the inner-select it is a no-op because the
> inner select is doing the deduping:
>
> SELECT
>
>     CONCAT_WS(
>
>       '-',
>
>       CAST(MONTH(row_datetime) AS VARCHAR),
>
>       CAST(YEAR(row_datetime) AS VARCHAR),
>
>       CAST(user_id AS VARCHAR)
>
>     ),
>
>     COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT
> keyword here. Flink doesn't barf for this.
>
> FROM (
>
>     SELECT
>
>         user_id,
>
>         event_id,
>
>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>
>     FROM event_foo
>
>     GROUP BY event_id, user_id
>
> )
>
> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>
> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vm...@lyft.com> wrote:
>
>> More details on the error with query#1 that used COUNT(DISTINCT()):
>>
>> org.apache.flink.table.api.TableException: Cannot generate a valid
>> execution plan for the given query:
>>
>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
>> lower_boundary=[$t3], latency_marker=[$t4])
>>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>>       FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>>
>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>     FlinkLogicalAggregate(group=[{0, 1, 2}],
>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>>       FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>>         FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])
>>
>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>>
>> This exception indicates that the query uses an unsupported SQL feature.
>> Please check the documentation for the set of currently supported SQL
>> features.
>>
>> at
>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
>> at
>> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
>>
>>
>>
>> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vm...@lyft.com> wrote:
>>
>>> Hi!
>>>
>>> We are using apache-flink-1.4.2. It seems this version doesn't support
>>> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
>>>
>>> SELECT
>>>
>>>     CONCAT_WS(
>>>
>>>       '-',
>>>
>>>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>>>
>>>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>>>
>>>       CAST(user_id AS VARCHAR)
>>>
>>>     ),
>>>
>>>     COUNT(*DISTINCT*(event_id)) AS event_count
>>>
>>> FROM event_foo
>>>
>>> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
>>> YEAR(longToDateTime(rowtime))
>>>
>>>
>>> (the duplicate events have the same 'event_id' (and user_id), the other
>>> fields e.g. timestamps may or may not be different)
>>>
>>>
>>> But that failed because DISTINCT is not supported. As a workaround I
>>> tried:
>>>
>>> SELECT
>>>
>>>     CONCAT_WS(
>>>
>>>       '-',
>>>
>>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>>
>>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>>
>>>       CAST(user_id AS VARCHAR)
>>>
>>>     ),
>>>
>>>     COUNT(event_id) AS event_count
>>>
>>> FROM (
>>>
>>>     SELECT
>>>
>>>         user_id,
>>>
>>>         event_id,
>>>
>>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>>
>>>     FROM event_foo
>>>
>>>     GROUP BY event_id, user_id
>>>
>>> )
>>>
>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>>
>>> I am hoping the inner SELECT to do the deduping because logically it is
>>> equivalent to a DISTINCT. This works in my functional testing.
>>>
>>> Will it also work if the dedups span different event buckets? I was
>>> hoping that as long as the events arrive within the state "retention time"
>>> in flink they should be deduped but I am new to Flink so I am not sure
>>> about that. Can someone please correct me if I am wrong? Is this a
>>> reasonable workaround for lack of DISTINCT support? Please let me know if
>>> there is a better way.
>>>
>>> Thanks,
>>> Vinod
>>>
>>>

Re: count(DISTINCT) in flink SQL

Posted by Vinod Mehra <vm...@lyft.com>.
Another interesting thing is that if I add DISTINCT in the 2nd query it
doesn't complain. But because of the inner-select it is a no-op because the
inner select is doing the deduping:

SELECT

    CONCAT_WS(

      '-',

      CAST(MONTH(row_datetime) AS VARCHAR),

      CAST(YEAR(row_datetime) AS VARCHAR),

      CAST(user_id AS VARCHAR)

    ),

    COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT keyword
here. Flink doesn't barf for this.

FROM (

    SELECT

        user_id,

        event_id,

        maxtimestamp(longToDateTime(rowtime)) as row_datetime

    FROM event_foo

    GROUP BY event_id, user_id

)

GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)

On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vm...@lyft.com> wrote:

> More details on the error with query#1 that used COUNT(DISTINCT()):
>
> org.apache.flink.table.api.TableException: Cannot generate a valid
> execution plan for the given query:
>
> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
> lower_boundary=[$t3], latency_marker=[$t4])
>   FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
>     FlinkLogicalAggregate(group=[{0, 1, 2}],
> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
>       FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])
>
> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>     FlinkLogicalAggregate(group=[{0, 1, 2}],
> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
>       FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
>         FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])
>
> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
>
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
>
> at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
> at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)
>
>
>
> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vm...@lyft.com> wrote:
>
>> Hi!
>>
>> We are using apache-flink-1.4.2. It seems this version doesn't support
>> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
>>
>> SELECT
>>
>>     CONCAT_WS(
>>
>>       '-',
>>
>>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>>
>>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>>
>>       CAST(user_id AS VARCHAR)
>>
>>     ),
>>
>>     COUNT(*DISTINCT*(event_id)) AS event_count
>>
>> FROM event_foo
>>
>> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
>> YEAR(longToDateTime(rowtime))
>>
>>
>> (the duplicate events have the same 'event_id' (and user_id), the other
>> fields e.g. timestamps may or may not be different)
>>
>>
>> But that failed because DISTINCT is not supported. As a workaround I
>> tried:
>>
>> SELECT
>>
>>     CONCAT_WS(
>>
>>       '-',
>>
>>       CAST(MONTH(row_datetime) AS VARCHAR),
>>
>>       CAST(YEAR(row_datetime) AS VARCHAR),
>>
>>       CAST(user_id AS VARCHAR)
>>
>>     ),
>>
>>     COUNT(event_id) AS event_count
>>
>> FROM (
>>
>>     SELECT
>>
>>         user_id,
>>
>>         event_id,
>>
>>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>>
>>     FROM event_foo
>>
>>     GROUP BY event_id, user_id
>>
>> )
>>
>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>>
>> I am hoping the inner SELECT to do the deduping because logically it is
>> equivalent to a DISTINCT. This works in my functional testing.
>>
>> Will it also work if the dedups span different event buckets? I was
>> hoping that as long as the events arrive within the state "retention time"
>> in flink they should be deduped but I am new to Flink so I am not sure
>> about that. Can someone please correct me if I am wrong? Is this a
>> reasonable workaround for lack of DISTINCT support? Please let me know if
>> there is a better way.
>>
>> Thanks,
>> Vinod
>>
>>

Re: count(DISTINCT) in flink SQL

Posted by Vinod Mehra <vm...@lyft.com>.
More details on the error with query#1 that used COUNT(DISTINCT()):

org.apache.flink.table.api.TableException: Cannot generate a valid
execution plan for the given query:

FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'],
expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
"ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER
SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"],
expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE
"ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)],
EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8],
lower_boundary=[$t3], latency_marker=[$t4])
  FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT
DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner])
    FlinkLogicalAggregate(group=[{0, 1, 2}],
lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)])
      FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
$f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21])

FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])
    FlinkLogicalAggregate(group=[{0, 1, 2}],
mastercard_world_elite_monthly_rides_encoded=[COUNT($3)])
      FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
        FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)],
expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)],
expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)],
expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)],
expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'],
expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)],
expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3],
$f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21])

FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL
features.

at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414)



On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vm...@lyft.com> wrote:

> Hi!
>
> We are using apache-flink-1.4.2. It seems this version doesn't support
> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
>
> SELECT
>
>     CONCAT_WS(
>
>       '-',
>
>       CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
>
>       CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
>
>       CAST(user_id AS VARCHAR)
>
>     ),
>
>     COUNT(*DISTINCT*(event_id)) AS event_count
>
> FROM event_foo
>
> GROUP BY user_id, MONTH(longToDateTime(rowtime)),
> YEAR(longToDateTime(rowtime))
>
>
> (the duplicate events have the same 'event_id' (and user_id), the other
> fields e.g. timestamps may or may not be different)
>
>
> But that failed because DISTINCT is not supported. As a workaround I tried:
>
> SELECT
>
>     CONCAT_WS(
>
>       '-',
>
>       CAST(MONTH(row_datetime) AS VARCHAR),
>
>       CAST(YEAR(row_datetime) AS VARCHAR),
>
>       CAST(user_id AS VARCHAR)
>
>     ),
>
>     COUNT(event_id) AS event_count
>
> FROM (
>
>     SELECT
>
>         user_id,
>
>         event_id,
>
>         maxtimestamp(longToDateTime(rowtime)) as row_datetime
>
>     FROM event_foo
>
>     GROUP BY event_id, user_id
>
> )
>
> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
>
> I am hoping the inner SELECT to do the deduping because logically it is
> equivalent to a DISTINCT. This works in my functional testing.
>
> Will it also work if the dedups span different event buckets? I was hoping
> that as long as the events arrive within the state "retention time" in
> flink they should be deduped but I am new to Flink so I am not sure about
> that. Can someone please correct me if I am wrong? Is this a reasonable
> workaround for lack of DISTINCT support? Please let me know if there is a
> better way.
>
> Thanks,
> Vinod
>
>