You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Salva Alcántara <sa...@gmail.com> on 2022/10/04 17:18:20 UTC
Can temporal table functions only be registered using the table API?
Based on this:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
It seems that the only way of registering temporal table functions is via
the Table API.
If that is the case, is there a way to make this example work
```
SELECT
SUM(amount * rate) AS amount
FROM
orders,
LATERAL TABLE (rates(order_time))
WHERE
rates.currency = orders.currency
```
without the Table API, just using SQL? E.g., is it possible to deploy the
temporal table function to the cluster (by packaging it in a jar file) and
then run the above query from the Flink SQL CLI?
Thanks in advance,
Salva
Re: Can temporal table functions only be registered using the table API?
Posted by David Anderson <da...@apache.org>.
As for your original question, the documentation states that a temporal
table function can only be registered via the Table API, and I believe this
is true.
David
On Thu, Oct 6, 2022 at 9:59 AM David Anderson <da...@apache.org> wrote:
> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>> tEnv.registerFunction(
>> "dimension_table1",
>> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>> D1.col1 AS A,
>> D1.col2 AS B,
>> FROM
>> fact_table,
>> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>> fact_table.dim1 = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>> private abstract static class AbstractFactDimTableJoin<IN1, OUT>
>> extends CoProcessFunction<IN1, Dimension, OUT> {
>> private static final long serialVersionUID = 1L;
>>
>> protected transient ValueState<Dimension> dimState;
>>
>> @Override
>> public void processElement1(IN1 value, Context ctx, Collector<OUT>
>> out) throws Exception {
>> Dimension dim = dimState.value();
>> if (dim == null) {
>> return;
>> }
>> out.collect(join(value, dim));
>> }
>>
>> abstract OUT join(IN1 value, Dimension dim);
>>
>> @Override
>> public void processElement2(Dimension value, Context ctx,
>> Collector<OUT> out) throws Exception {
>> dimState.update(value);
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> super.open(parameters);
>> ValueStateDescriptor<Dimension> dimStateDesc =
>> new ValueStateDescriptor<>("dimstate", Dimension.class);
>> this.dimState = getRuntimeContext().getState(dimStateDesc);
>> }
>> }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>> o.amount * r.rate AS amount
>>> FROM
>>> Orders AS o,
>>> LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>> SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>> RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>> SELECT MAX(rowtime)
>>> FROM RatesHistory AS r2
>>> WHERE r2.currency = o.currency
>>> AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
>>> wrote:
>>>
>>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>>
>>>> This case seems to require a different syntax based on LATERAL TABLE
>>>> and a temporal table function (FOR SYSTEM_TIME is not supported). From the
>>>> docs too, it seems that temporal table functions can only be registered via
>>>> the table API. Am I missing/misunderstanding something?
>>>>
>>>> Salva
>>>>
>>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Salva,
>>>>>
>>>>> The examples for temporal table joins can be found at
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>>>> Your example is definitely possible with just using SQL.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn
>>>>>
>>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
>>>>> salcantaraphd@gmail.com> wrote:
>>>>>
>>>>>> Based on this:
>>>>>>
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>>>
>>>>>> It seems that the only way of registering temporal table functions is
>>>>>> via the Table API.
>>>>>>
>>>>>> If that is the case, is there a way to make this example work
>>>>>>
>>>>>> ```
>>>>>> SELECT
>>>>>> SUM(amount * rate) AS amount
>>>>>> FROM
>>>>>> orders,
>>>>>> LATERAL TABLE (rates(order_time))
>>>>>> WHERE
>>>>>> rates.currency = orders.currency
>>>>>> ```
>>>>>>
>>>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>>>> and then run the above query from the Flink SQL CLI?
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> Salva
>>>>>>
>>>>>>
Re: Can temporal table functions only be registered using the table API?
Posted by David Anderson <da...@apache.org>.
I was wrong about this. The AS OF style processing join has been disabled
at a higher level,
in org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator
David
On Thu, Oct 6, 2022 at 9:59 AM David Anderson <da...@apache.org> wrote:
> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>> tEnv.registerFunction(
>> "dimension_table1",
>> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>> D1.col1 AS A,
>> D1.col2 AS B,
>> FROM
>> fact_table,
>> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>> fact_table.dim1 = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>> private abstract static class AbstractFactDimTableJoin<IN1, OUT>
>> extends CoProcessFunction<IN1, Dimension, OUT> {
>> private static final long serialVersionUID = 1L;
>>
>> protected transient ValueState<Dimension> dimState;
>>
>> @Override
>> public void processElement1(IN1 value, Context ctx, Collector<OUT>
>> out) throws Exception {
>> Dimension dim = dimState.value();
>> if (dim == null) {
>> return;
>> }
>> out.collect(join(value, dim));
>> }
>>
>> abstract OUT join(IN1 value, Dimension dim);
>>
>> @Override
>> public void processElement2(Dimension value, Context ctx,
>> Collector<OUT> out) throws Exception {
>> dimState.update(value);
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> super.open(parameters);
>> ValueStateDescriptor<Dimension> dimStateDesc =
>> new ValueStateDescriptor<>("dimstate", Dimension.class);
>> this.dimState = getRuntimeContext().getState(dimStateDesc);
>> }
>> }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>> o.amount * r.rate AS amount
>>> FROM
>>> Orders AS o,
>>> LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>> SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>> RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>> SELECT MAX(rowtime)
>>> FROM RatesHistory AS r2
>>> WHERE r2.currency = o.currency
>>> AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
>>> wrote:
>>>
>>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>>
>>>> This case seems to require a different syntax based on LATERAL TABLE
>>>> and a temporal table function (FOR SYSTEM_TIME is not supported). From the
>>>> docs too, it seems that temporal table functions can only be registered via
>>>> the table API. Am I missing/misunderstanding something?
>>>>
>>>> Salva
>>>>
>>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Salva,
>>>>>
>>>>> The examples for temporal table joins can be found at
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>>>> Your example is definitely possible with just using SQL.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn
>>>>>
>>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
>>>>> salcantaraphd@gmail.com> wrote:
>>>>>
>>>>>> Based on this:
>>>>>>
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>>>
>>>>>> It seems that the only way of registering temporal table functions is
>>>>>> via the Table API.
>>>>>>
>>>>>> If that is the case, is there a way to make this example work
>>>>>>
>>>>>> ```
>>>>>> SELECT
>>>>>> SUM(amount * rate) AS amount
>>>>>> FROM
>>>>>> orders,
>>>>>> LATERAL TABLE (rates(order_time))
>>>>>> WHERE
>>>>>> rates.currency = orders.currency
>>>>>> ```
>>>>>>
>>>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>>>> and then run the above query from the Flink SQL CLI?
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> Salva
>>>>>>
>>>>>>
Re: Can temporal table functions only be registered using the table API?
Posted by David Anderson <da...@apache.org>.
Salva,
Have you tried doing an AS OF style processing time temporal join? I know
the documentation leads one to believe this isn't supported, but I think it
actually works. I'm basing this on this comment [1] in the code for
the TemporalProcessTimeJoinOperator:
The operator to temporal join a stream on processing time.
For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime))
> and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same
> processing-time operator implementation, the differences between them are:
> (1) The temporal TableFunction join only supports single column in primary
> key but temporal table join supports arbitrary columns in primary key. (2)
> The temporal TableFunction join only supports inner join, temporal table
> join supports both inner join and left outer join.
[1]
https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
Regards,
David
On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <sa...@gmail.com>
wrote:
> I've found more examples here:
>
> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>
> where a fact table is enriched using several dimension tables, but again
> the temporal table functions are registered using Table API like so:
>
> ```java
> tEnv.registerFunction(
> "dimension_table1",
> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
> "id"));```
>
> It's not exactly the same application, since this example covers a lookup
> join, but the SQL query is also relying on the LATERAL TABLE + temporal
> table functions:
>
> ```
> SELECT
> D1.col1 AS A,
> D1.col2 AS B,
> FROM
> fact_table,
> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
> WHERE
> fact_table.dim1 = D1.id
> ```
>
> In particular, this produces a job which is equivalent to
>
> ```
> private abstract static class AbstractFactDimTableJoin<IN1, OUT>
> extends CoProcessFunction<IN1, Dimension, OUT> {
> private static final long serialVersionUID = 1L;
>
> protected transient ValueState<Dimension> dimState;
>
> @Override
> public void processElement1(IN1 value, Context ctx, Collector<OUT>
> out) throws Exception {
> Dimension dim = dimState.value();
> if (dim == null) {
> return;
> }
> out.collect(join(value, dim));
> }
>
> abstract OUT join(IN1 value, Dimension dim);
>
> @Override
> public void processElement2(Dimension value, Context ctx,
> Collector<OUT> out) throws Exception {
> dimState.update(value);
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> ValueStateDescriptor<Dimension> dimStateDesc =
> new ValueStateDescriptor<>("dimstate", Dimension.class);
> this.dimState = getRuntimeContext().getState(dimStateDesc);
> }
> }
> ```
>
> I'm basically interested in rewriting these types of DIY joins (based on
> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
> possible, otherwise I would like to know which limitations there are.
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> By looking at the docs for older versions of Flink, e.g.,
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>
>> it seems that it's possible to rewrite this query
>>
>> ```
>> SELECT
>> o.amount * r.rate AS amount
>> FROM
>> Orders AS o,
>> LATERAL TABLE (Rates(o.rowtime)) AS r
>> WHERE r.currency = o.currency
>> ```
>>
>> as
>>
>> ```
>> SELECT
>> SUM(o.amount * r.rate) AS amount
>> FROM Orders AS o,
>> RatesHistory AS r
>> WHERE r.currency = o.currency
>> AND r.rowtime = (
>> SELECT MAX(rowtime)
>> FROM RatesHistory AS r2
>> WHERE r2.currency = o.currency
>> AND r2.rowtime <= o.rowtime);
>> ```
>>
>> This would be a way to accomplish this task in SQL without using a
>> temporal table function.
>>
>> Would this rewrite be equivalent in terms of the final generated job?
>> Obviously I very much prefer the LATERAL TABLE query but this requires
>> using a temporal table function which can only be registered using the
>> Table API (apparently).
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
>> wrote:
>>
>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>
>>> This case seems to require a different syntax based on LATERAL TABLE and
>>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>>> too, it seems that temporal table functions can only be registered via the
>>> table API. Am I missing/misunderstanding something?
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
>>> wrote:
>>>
>>>> Hi Salva,
>>>>
>>>> The examples for temporal table joins can be found at
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>>> Your example is definitely possible with just using SQL.
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
>>>> salcantaraphd@gmail.com> wrote:
>>>>
>>>>> Based on this:
>>>>>
>>>>>
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>>
>>>>> It seems that the only way of registering temporal table functions is
>>>>> via the Table API.
>>>>>
>>>>> If that is the case, is there a way to make this example work
>>>>>
>>>>> ```
>>>>> SELECT
>>>>> SUM(amount * rate) AS amount
>>>>> FROM
>>>>> orders,
>>>>> LATERAL TABLE (rates(order_time))
>>>>> WHERE
>>>>> rates.currency = orders.currency
>>>>> ```
>>>>>
>>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>>> and then run the above query from the Flink SQL CLI?
>>>>>
>>>>> Thanks in advance,
>>>>>
>>>>> Salva
>>>>>
>>>>>
Re: Can temporal table functions only be registered using the table API?
Posted by Salva Alcántara <sa...@gmail.com>.
I've found more examples here:
https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
where a fact table is enriched using several dimension tables, but again
the temporal table functions are registered using Table API like so:
```java
tEnv.registerFunction(
"dimension_table1",
tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
"id"));```
It's not exactly the same application, since this example covers a lookup
join, but the SQL query is also relying on the LATERAL TABLE + temporal
table functions:
```
SELECT
D1.col1 AS A,
D1.col2 AS B,
FROM
fact_table,
LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
WHERE
fact_table.dim1 = D1.id
```
In particular, this produces a job which is equivalent to
```
private abstract static class AbstractFactDimTableJoin<IN1, OUT>
extends CoProcessFunction<IN1, Dimension, OUT> {
private static final long serialVersionUID = 1L;
protected transient ValueState<Dimension> dimState;
@Override
public void processElement1(IN1 value, Context ctx, Collector<OUT> out)
throws Exception {
Dimension dim = dimState.value();
if (dim == null) {
return;
}
out.collect(join(value, dim));
}
abstract OUT join(IN1 value, Dimension dim);
@Override
public void processElement2(Dimension value, Context ctx,
Collector<OUT> out) throws Exception {
dimState.update(value);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Dimension> dimStateDesc =
new ValueStateDescriptor<>("dimstate", Dimension.class);
this.dimState = getRuntimeContext().getState(dimStateDesc);
}
}
```
I'm basically interested in rewriting these types of DIY joins (based on
CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
possible, otherwise I would like to know which limitations there are.
Regards,
Salva
On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
wrote:
> By looking at the docs for older versions of Flink, e.g.,
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>
> it seems that it's possible to rewrite this query
>
> ```
> SELECT
> o.amount * r.rate AS amount
> FROM
> Orders AS o,
> LATERAL TABLE (Rates(o.rowtime)) AS r
> WHERE r.currency = o.currency
> ```
>
> as
>
> ```
> SELECT
> SUM(o.amount * r.rate) AS amount
> FROM Orders AS o,
> RatesHistory AS r
> WHERE r.currency = o.currency
> AND r.rowtime = (
> SELECT MAX(rowtime)
> FROM RatesHistory AS r2
> WHERE r2.currency = o.currency
> AND r2.rowtime <= o.rowtime);
> ```
>
> This would be a way to accomplish this task in SQL without using a
> temporal table function.
>
> Would this rewrite be equivalent in terms of the final generated job?
> Obviously I very much prefer the LATERAL TABLE query but this requires
> using a temporal table function which can only be registered using the
> Table API (apparently).
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> It doesn't seem the case with processing time unless I'm mistaken:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>
>> This case seems to require a different syntax based on LATERAL TABLE and
>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>> too, it seems that temporal table functions can only be registered via the
>> table API. Am I missing/misunderstanding something?
>>
>> Salva
>>
>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
>> wrote:
>>
>>> Hi Salva,
>>>
>>> The examples for temporal table joins can be found at
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>> Your example is definitely possible with just using SQL.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <sa...@gmail.com>
>>> wrote:
>>>
>>>> Based on this:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>
>>>> It seems that the only way of registering temporal table functions is
>>>> via the Table API.
>>>>
>>>> If that is the case, is there a way to make this example work
>>>>
>>>> ```
>>>> SELECT
>>>> SUM(amount * rate) AS amount
>>>> FROM
>>>> orders,
>>>> LATERAL TABLE (rates(order_time))
>>>> WHERE
>>>> rates.currency = orders.currency
>>>> ```
>>>>
>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>> and then run the above query from the Flink SQL CLI?
>>>>
>>>> Thanks in advance,
>>>>
>>>> Salva
>>>>
>>>>
Re: Can temporal table functions only be registered using the table API?
Posted by Salva Alcántara <sa...@gmail.com>.
By looking at the docs for older versions of Flink, e.g.,
https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
it seems that it's possible to rewrite this query
```
SELECT
o.amount * r.rate AS amount
FROM
Orders AS o,
LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
```
as
```
SELECT
SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
SELECT MAX(rowtime)
FROM RatesHistory AS r2
WHERE r2.currency = o.currency
AND r2.rowtime <= o.rowtime);
```
This would be a way to accomplish this task in SQL without using a temporal
table function.
Would this rewrite be equivalent in terms of the final generated job?
Obviously I very much prefer the LATERAL TABLE query but this requires
using a temporal table function which can only be registered using the
Table API (apparently).
Regards,
Salva
On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
wrote:
> It doesn't seem the case with processing time unless I'm mistaken:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>
> This case seems to require a different syntax based on LATERAL TABLE and a
> temporal table function (FOR SYSTEM_TIME is not supported). From the docs
> too, it seems that temporal table functions can only be registered via the
> table API. Am I missing/misunderstanding something?
>
> Salva
>
> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi Salva,
>>
>> The examples for temporal table joins can be found at
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>> Your example is definitely possible with just using SQL.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <sa...@gmail.com>
>> wrote:
>>
>>> Based on this:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>
>>> It seems that the only way of registering temporal table functions is
>>> via the Table API.
>>>
>>> If that is the case, is there a way to make this example work
>>>
>>> ```
>>> SELECT
>>> SUM(amount * rate) AS amount
>>> FROM
>>> orders,
>>> LATERAL TABLE (rates(order_time))
>>> WHERE
>>> rates.currency = orders.currency
>>> ```
>>>
>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>> the temporal table function to the cluster (by packaging it in a jar file)
>>> and then run the above query from the Flink SQL CLI?
>>>
>>> Thanks in advance,
>>>
>>> Salva
>>>
>>>
Re: Can temporal table functions only be registered using the table API?
Posted by Salva Alcántara <sa...@gmail.com>.
It doesn't seem the case with processing time unless I'm mistaken:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
This case seems to require a different syntax based on LATERAL TABLE and a
temporal table function (FOR SYSTEM_TIME is not supported). From the docs
too, it seems that temporal table functions can only be registered via the
table API. Am I missing/misunderstanding something?
Salva
On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org> wrote:
> Hi Salva,
>
> The examples for temporal table joins can be found at
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
> Your example is definitely possible with just using SQL.
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> Based on this:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>
>> It seems that the only way of registering temporal table functions is via
>> the Table API.
>>
>> If that is the case, is there a way to make this example work
>>
>> ```
>> SELECT
>> SUM(amount * rate) AS amount
>> FROM
>> orders,
>> LATERAL TABLE (rates(order_time))
>> WHERE
>> rates.currency = orders.currency
>> ```
>>
>> without the Table API, just using SQL? E.g., is it possible to deploy the
>> temporal table function to the cluster (by packaging it in a jar file) and
>> then run the above query from the Flink SQL CLI?
>>
>> Thanks in advance,
>>
>> Salva
>>
>>
Re: Can temporal table functions only be registered using the table API?
Posted by Martijn Visser <ma...@apache.org>.
Hi Salva,
The examples for temporal table joins can be found at
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
Your example is definitely possible with just using SQL.
Best regards,
Martijn
On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <sa...@gmail.com>
wrote:
> Based on this:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>
> It seems that the only way of registering temporal table functions is via
> the Table API.
>
> If that is the case, is there a way to make this example work
>
> ```
> SELECT
> SUM(amount * rate) AS amount
> FROM
> orders,
> LATERAL TABLE (rates(order_time))
> WHERE
> rates.currency = orders.currency
> ```
>
> without the Table API, just using SQL? E.g., is it possible to deploy the
> temporal table function to the cluster (by packaging it in a jar file) and
> then run the above query from the Flink SQL CLI?
>
> Thanks in advance,
>
> Salva
>
>