You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by jeremyji <18...@163.com> on 2020/01/02 13:16:06 UTC
Flink group with time-windowed join
Two stream as table1, table2. We know that group with regular join won't work
so we have to use time-windowed join. So here is my flink sql looks like:
*SELECT
a.account account,
SUM(a.value) + SUM(b.value),
UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
MINUTE))
FROM
(SELECT
account,
value,
producer_timestamp
FROM
table1) a,
(SELECT
account,
value,
producer_timestamp
FROM
table2) b
WHERE
a.account = b.account AND
a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp)
group by
a.account,
TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
But i still got error from flink:
/Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.
Please check the documentation for the set of currently supported SQL
features.
at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
at
org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
..../
I think i use time-windowed join just like this doc
says:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins.
But flink told me its a regular join. Is there anything wrong i haven't
notice?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink group with time-windowed join
Posted by jeremyji <18...@163.com>.
Hi Dawid,
I simplified my sql, the original sql is more complex and have a unnest
select like:
*SELECT
a.account,
(SUM(a.value) + SUM(b.value)) as result,
TUMBLE_START(a.producer_timestamp, INTERVAL '3' MINUTE)
FROM
(SELECT
account,
value,
producer_timestamp
FROM
table1) a,
(SELECT
account,
value,
producer_timestamp
FROM
table2,
unnest(table2.row_array) as T(account, value) b
WHERE
a.account = b.account AND
a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp
group by
a.account,
TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
table2 has a column row_array which is a row array and each row has tow
fields: account and value.
producer_timestamp is time attribute, as a column of table2.
BTW, my Flink version is 1.7.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink group with time-windowed join
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,
Could you check your query again? I could not reproduce your issue on
latest master. I had to adjust your query slightly though:
SELECT
a.account,
(SUM(a.`value`) + SUM(b.`value`)) as `result`,
TUMBLE_START(a.producer_timestamp, INTERVAL '3' MINUTE)
FROM
(SELECT
account,
`value`,
producer_timestamp
FROM
table1) a,
(SELECT
account,
`value`,
producer_timestamp
FROM
table2) b
WHERE
a.account = b.account AND
a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp
group by
a.account,
TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)
Best,
Dawid
On 04/01/2020 04:06, Kurt Young wrote:
> Looks like a bug to me, could you fire an issue for this?
>
> Best,
> Kurt
>
>
> On Thu, Jan 2, 2020 at 9:06 PM jeremyji <18868129322@163.com
> <ma...@163.com>> wrote:
>
> Two stream as table1, table2. We know that group with regular join
> won't work
> so we have to use time-windowed join. So here is my flink sql
> looks like:
>
> *SELECT
> a.account account,
> SUM(a.value) + SUM(b.value),
> UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
> MINUTE))
> FROM
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table1) a,
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table2) b
> WHERE
> a.account = b.account AND
> a.producer_timestamp BETWEEN b.producer_timestamp -
> INTERVAL '3'
> MINUTE AND b.producer_timestamp)
> group by
> a.account,
> TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
> But i still got error from flink:
>
> /Rowtime attributes must not be in the input rows of a regular
> join. As a
> workaround you can cast the time attributes of input tables to
> TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
> at
> org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
> at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> at
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
> at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
> ..../
> I think i use time-windowed join just like this doc
> says:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins.
> But flink told me its a regular join. Is there anything wrong i
> haven't
> notice?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
Re: Flink group with time-windowed join
Posted by Kurt Young <yk...@gmail.com>.
Looks like a bug to me, could you fire an issue for this?
Best,
Kurt
On Thu, Jan 2, 2020 at 9:06 PM jeremyji <18...@163.com> wrote:
> Two stream as table1, table2. We know that group with regular join won't
> work
> so we have to use time-windowed join. So here is my flink sql looks like:
>
> *SELECT
> a.account account,
> SUM(a.value) + SUM(b.value),
> UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
> MINUTE))
> FROM
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table1) a,
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table2) b
> WHERE
> a.account = b.account AND
> a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
> MINUTE AND b.producer_timestamp)
> group by
> a.account,
> TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
> But i still got error from flink:
>
> /Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
>
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
> at
>
> org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
> at
>
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
> at
>
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
> at
>
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
> ..../
> I think i use time-windowed join just like this doc
> says:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins
> .
> But flink told me its a regular join. Is there anything wrong i haven't
> notice?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>