You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "guoliubin85@foxmail.com" <gu...@foxmail.com> on 2020/12/14 10:18:15 UTC
upsert-kafka to do temporal table joins
I just try to follow the example list in the page
https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql
Unfortunately when I try to upload the python file to flink cluster, the error occured:
py4j.protocol.Py4JJavaError: An error occurred while calling o2.sqlQuery.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "ON" at line 8, column 5.
Was expecting one of:
<EOF>
"EXCEPT" ...
"FETCH" ...
"GROUP" ...
Maybe I miss something?
guoliubin85@foxmail.com
Re: upsert-kafka to do temporal table joins
Posted by Leonard Xu <xb...@gmail.com>.
Hi, guoliubin
Please ignore my previous answer, I mixed your question with an another one, I post the right temporal join sql syntax here.
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
The query in announcement missed the JOIN key word which is necessary, and the AS keyword is optional.
The wright query in example should be:
SELECT
o.order_id,
o.order_time,
o.amount * r.currency_rate AS amount,
r.currency
FROM orders AS o JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time AS r —- the AS keyword is necessary
ON o.currency = r.currency;
I’ll raise up a PR to fix the example in release note.
Best,
Leonard
Re: upsert-kafka to do temporal table joins
Posted by Leonard Xu <xb...@gmail.com>.
Hi, guoliubin
Sorry for the late reply, I think the example in release note has a minor typo error which missed the ‘AS’ keyword.
SELECT
o.order_id,
o.order_time,
o.amount * r.currency_rate AS amount,
r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;
The ‘AS’ keyword is necessary if we alias the correlation name here, I know it’s easy to be ignored(just like the release note o(╯□╰)o ) , but the syntax follows SQL standard 2011 7.6 table reference:
<table primary> ::=
<table or query name> [ <query system time period specification> ]
[ [ AS ] <correlation name>
[ <left paren> <derived column list> <right paren> ] ]
Best,
Leonard
> 在 2020年12月14日,18:18,guoliubin85@foxmail.com 写道:
>
> I just try to follow the example list in the page
> https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql <https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql>
>
> Unfortunately when I try to upload the python file to flink cluster, the error occured:
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.sqlQuery.
> : org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "ON" at line 8, column 5.
> Was expecting one of:
> <EOF>
> "EXCEPT" ...
> "FETCH" ...
> "GROUP" ...
>
> Maybe I miss something?
> guoliubin85@foxmail.com <ma...@foxmail.com>