You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "guoliubin85@foxmail.com" <gu...@foxmail.com> on 2020/12/14 10:18:15 UTC

upsert-kafka to do temporal table joins

I just try to follow the example list in the page
https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql 

Unfortunately when I try to upload the python file to flink cluster, the error occured:

py4j.protocol.Py4JJavaError: An error occurred while calling o2.sqlQuery.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "ON" at line 8, column 5.
Was expecting one of:
    <EOF>
    "EXCEPT" ...
    "FETCH" ...
    "GROUP" ...

 Maybe I miss something?


guoliubin85@foxmail.com

Re: upsert-kafka to do temporal table joins

Posted by Leonard Xu <xb...@gmail.com>.
Hi, guoliubin
Please ignore my previous answer, I mixed your question with an another one, I post the right temporal join sql syntax here.

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
The query in announcement missed the JOIN key word which is necessary, and the AS keyword is optional.
The wright query in example should be:
SELECT 
    o.order_id,
    o.order_time, 
    o.amount * r.currency_rate AS amount,
    r.currency
FROM orders AS o JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time AS r —- the AS keyword is necessary
ON o.currency = r.currency;
I’ll raise up a PR to fix the example in release note.

Best,
Leonard


Re: upsert-kafka to do temporal table joins

Posted by Leonard Xu <xb...@gmail.com>.
Hi, guoliubin
Sorry for the late reply, I think the example in release note has a minor typo error which missed the ‘AS’  keyword.

SELECT 
    o.order_id,
    o.order_time, 
    o.amount * r.currency_rate AS amount,
    r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;
The ‘AS’ keyword is necessary if we alias the correlation name here, I know it’s easy to be ignored(just like the release note o(╯□╰)o ) , but the syntax follows SQL standard 2011 7.6 table reference:

<table primary> ::=
    <table or query name> [ <query system time period specification> ]
        [ [ AS ] <correlation name>
          [ <left paren> <derived column list> <right paren> ] ]
 
Best,
Leonard


> 在 2020年12月14日,18:18,guoliubin85@foxmail.com 写道:
> 
> I just try to follow the example list in the page
> https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql <https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql> 
> 
> Unfortunately when I try to upload the python file to flink cluster, the error occured:
> 
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.sqlQuery.
> : org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "ON" at line 8, column 5.
> Was expecting one of:
>     <EOF>
>     "EXCEPT" ...
>     "FETCH" ...
>     "GROUP" ...
> 
>  Maybe I miss something?
> guoliubin85@foxmail.com <ma...@foxmail.com>