You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Salva Alcántara <sa...@gmail.com> on 2022/10/04 17:18:20 UTC

Can temporal table functions only be registered using the table API?

Based on this:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/

It seems that the only way of registering temporal table functions is via
the Table API.

If that is the case, is there a way to make this example work

```
SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency
```

without the Table API, just using SQL? E.g., is it possible to deploy the
temporal table function to the cluster (by packaging it in a jar file) and
then run the above query from the Flink SQL CLI?

Thanks in advance,

Salva

Re: Can temporal table functions only be registered using the table API?

Posted by David Anderson <da...@apache.org>.
As for your original question, the documentation states that a temporal
table function can only be registered via the Table API, and I believe this
is true.

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson <da...@apache.org> wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>>     tEnv.registerFunction(
>>         "dimension_table1",
>>         tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>>     D1.col1 AS A,
>>     D1.col2 AS B,
>> FROM
>>     fact_table,
>>     LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>>     fact_table.dim1     = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin<IN1, OUT>
>>       extends CoProcessFunction<IN1, Dimension, OUT> {
>>     private static final long serialVersionUID = 1L;
>>
>>     protected transient ValueState<Dimension> dimState;
>>
>>     @Override
>>     public void processElement1(IN1 value, Context ctx, Collector<OUT>
>> out) throws Exception {
>>       Dimension dim = dimState.value();
>>       if (dim == null) {
>>         return;
>>       }
>>       out.collect(join(value, dim));
>>     }
>>
>>     abstract OUT join(IN1 value, Dimension dim);
>>
>>     @Override
>>     public void processElement2(Dimension value, Context ctx,
>> Collector<OUT> out) throws Exception {
>>       dimState.update(value);
>>     }
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>       super.open(parameters);
>>       ValueStateDescriptor<Dimension> dimStateDesc =
>>           new ValueStateDescriptor<>("dimstate", Dimension.class);
>>       this.dimState = getRuntimeContext().getState(dimStateDesc);
>>     }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>>   SELECT MAX(rowtime)
>>>   FROM RatesHistory AS r2
>>>   WHERE r2.currency = o.currency
>>>   AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
>>> wrote:
>>>
>>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>>
>>>> This case seems to require a different syntax based on LATERAL TABLE
>>>> and a temporal table function (FOR SYSTEM_TIME is not supported). From the
>>>> docs too, it seems that temporal table functions can only be registered via
>>>> the table API. Am I missing/misunderstanding something?
>>>>
>>>> Salva
>>>>
>>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Salva,
>>>>>
>>>>> The examples for temporal table joins can be found at
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>>>> Your example is definitely possible with just using SQL.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn
>>>>>
>>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
>>>>> salcantaraphd@gmail.com> wrote:
>>>>>
>>>>>> Based on this:
>>>>>>
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>>>
>>>>>> It seems that the only way of registering temporal table functions is
>>>>>> via the Table API.
>>>>>>
>>>>>> If that is the case, is there a way to make this example work
>>>>>>
>>>>>> ```
>>>>>> SELECT
>>>>>>   SUM(amount * rate) AS amount
>>>>>> FROM
>>>>>>   orders,
>>>>>>   LATERAL TABLE (rates(order_time))
>>>>>> WHERE
>>>>>>   rates.currency = orders.currency
>>>>>> ```
>>>>>>
>>>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>>>> and then run the above query from the Flink SQL CLI?
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> Salva
>>>>>>
>>>>>>

Re: Can temporal table functions only be registered using the table API?

Posted by David Anderson <da...@apache.org>.
I was wrong about this. The AS OF style processing join has been disabled
at a higher level,
in org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson <da...@apache.org> wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>>     tEnv.registerFunction(
>>         "dimension_table1",
>>         tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>>     D1.col1 AS A,
>>     D1.col2 AS B,
>> FROM
>>     fact_table,
>>     LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>>     fact_table.dim1     = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin<IN1, OUT>
>>       extends CoProcessFunction<IN1, Dimension, OUT> {
>>     private static final long serialVersionUID = 1L;
>>
>>     protected transient ValueState<Dimension> dimState;
>>
>>     @Override
>>     public void processElement1(IN1 value, Context ctx, Collector<OUT>
>> out) throws Exception {
>>       Dimension dim = dimState.value();
>>       if (dim == null) {
>>         return;
>>       }
>>       out.collect(join(value, dim));
>>     }
>>
>>     abstract OUT join(IN1 value, Dimension dim);
>>
>>     @Override
>>     public void processElement2(Dimension value, Context ctx,
>> Collector<OUT> out) throws Exception {
>>       dimState.update(value);
>>     }
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>       super.open(parameters);
>>       ValueStateDescriptor<Dimension> dimStateDesc =
>>           new ValueStateDescriptor<>("dimstate", Dimension.class);
>>       this.dimState = getRuntimeContext().getState(dimStateDesc);
>>     }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>>   SELECT MAX(rowtime)
>>>   FROM RatesHistory AS r2
>>>   WHERE r2.currency = o.currency
>>>   AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
>>> wrote:
>>>
>>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>>
>>>> This case seems to require a different syntax based on LATERAL TABLE
>>>> and a temporal table function (FOR SYSTEM_TIME is not supported). From the
>>>> docs too, it seems that temporal table functions can only be registered via
>>>> the table API. Am I missing/misunderstanding something?
>>>>
>>>> Salva
>>>>
>>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Salva,
>>>>>
>>>>> The examples for temporal table joins can be found at
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>>>> Your example is definitely possible with just using SQL.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn
>>>>>
>>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
>>>>> salcantaraphd@gmail.com> wrote:
>>>>>
>>>>>> Based on this:
>>>>>>
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>>>
>>>>>> It seems that the only way of registering temporal table functions is
>>>>>> via the Table API.
>>>>>>
>>>>>> If that is the case, is there a way to make this example work
>>>>>>
>>>>>> ```
>>>>>> SELECT
>>>>>>   SUM(amount * rate) AS amount
>>>>>> FROM
>>>>>>   orders,
>>>>>>   LATERAL TABLE (rates(order_time))
>>>>>> WHERE
>>>>>>   rates.currency = orders.currency
>>>>>> ```
>>>>>>
>>>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>>>> and then run the above query from the Flink SQL CLI?
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> Salva
>>>>>>
>>>>>>

Re: Can temporal table functions only be registered using the table API?

Posted by David Anderson <da...@apache.org>.
Salva,

Have you tried doing an AS OF style processing time temporal join? I know
the documentation leads one to believe this isn't supported, but I think it
actually works. I'm basing this on this comment [1] in the code for
the TemporalProcessTimeJoinOperator:

The operator to temporal join a stream on processing time.

For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime))
> and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same
> processing-time operator implementation, the differences between them are:
> (1) The temporal TableFunction join only supports single column in primary
> key but temporal table join supports arbitrary columns in primary key. (2)
> The temporal TableFunction join only supports inner join, temporal table
> join supports both inner join and left outer join.


[1]
https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38

Regards,
David

On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <sa...@gmail.com>
wrote:

> I've found more examples here:
>
> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>
> where a fact table is enriched using several dimension tables, but again
> the temporal table functions are registered using Table API like so:
>
> ```java
>     tEnv.registerFunction(
>         "dimension_table1",
>         tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
> "id"));```
>
> It's not exactly the same application, since this example covers a lookup
> join, but the SQL query is also relying on the LATERAL TABLE + temporal
> table functions:
>
> ```
> SELECT
>     D1.col1 AS A,
>     D1.col2 AS B,
> FROM
>     fact_table,
>     LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
> WHERE
>     fact_table.dim1     = D1.id
> ```
>
> In particular, this produces a job which is equivalent to
>
> ```
>   private abstract static class AbstractFactDimTableJoin<IN1, OUT>
>       extends CoProcessFunction<IN1, Dimension, OUT> {
>     private static final long serialVersionUID = 1L;
>
>     protected transient ValueState<Dimension> dimState;
>
>     @Override
>     public void processElement1(IN1 value, Context ctx, Collector<OUT>
> out) throws Exception {
>       Dimension dim = dimState.value();
>       if (dim == null) {
>         return;
>       }
>       out.collect(join(value, dim));
>     }
>
>     abstract OUT join(IN1 value, Dimension dim);
>
>     @Override
>     public void processElement2(Dimension value, Context ctx,
> Collector<OUT> out) throws Exception {
>       dimState.update(value);
>     }
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>       super.open(parameters);
>       ValueStateDescriptor<Dimension> dimStateDesc =
>           new ValueStateDescriptor<>("dimstate", Dimension.class);
>       this.dimState = getRuntimeContext().getState(dimStateDesc);
>     }
>   }
> ```
>
> I'm basically interested in rewriting these types of DIY joins (based on
> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
> possible, otherwise I would like to know which limitations there are.
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> By looking at the docs for older versions of Flink, e.g.,
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>
>> it seems that it's possible to rewrite this query
>>
>> ```
>> SELECT
>>   o.amount * r.rate AS amount
>> FROM
>>   Orders AS o,
>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>> WHERE r.currency = o.currency
>> ```
>>
>> as
>>
>> ```
>> SELECT
>>   SUM(o.amount * r.rate) AS amount
>> FROM Orders AS o,
>>   RatesHistory AS r
>> WHERE r.currency = o.currency
>> AND r.rowtime = (
>>   SELECT MAX(rowtime)
>>   FROM RatesHistory AS r2
>>   WHERE r2.currency = o.currency
>>   AND r2.rowtime <= o.rowtime);
>> ```
>>
>> This would be a way to accomplish this task in SQL without using a
>> temporal table function.
>>
>> Would this rewrite be equivalent in terms of the final generated job?
>> Obviously I very much prefer the LATERAL TABLE query but this requires
>> using a temporal table function which can only be registered using the
>> Table API (apparently).
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
>> wrote:
>>
>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>
>>> This case seems to require a different syntax based on LATERAL TABLE and
>>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>>> too, it seems that temporal table functions can only be registered via the
>>> table API. Am I missing/misunderstanding something?
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
>>> wrote:
>>>
>>>> Hi Salva,
>>>>
>>>> The examples for temporal table joins can be found at
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>>> Your example is definitely possible with just using SQL.
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
>>>> salcantaraphd@gmail.com> wrote:
>>>>
>>>>> Based on this:
>>>>>
>>>>>
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>>
>>>>> It seems that the only way of registering temporal table functions is
>>>>> via the Table API.
>>>>>
>>>>> If that is the case, is there a way to make this example work
>>>>>
>>>>> ```
>>>>> SELECT
>>>>>   SUM(amount * rate) AS amount
>>>>> FROM
>>>>>   orders,
>>>>>   LATERAL TABLE (rates(order_time))
>>>>> WHERE
>>>>>   rates.currency = orders.currency
>>>>> ```
>>>>>
>>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>>> and then run the above query from the Flink SQL CLI?
>>>>>
>>>>> Thanks in advance,
>>>>>
>>>>> Salva
>>>>>
>>>>>

Re: Can temporal table functions only be registered using the table API?

Posted by Salva Alcántara <sa...@gmail.com>.
I've found more examples here:

https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance

where a fact table is enriched using several dimension tables, but again
the temporal table functions are registered using Table API like so:

```java
    tEnv.registerFunction(
        "dimension_table1",
        tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
"id"));```

It's not exactly the same application, since this example covers a lookup
join, but the SQL query is also relying on the LATERAL TABLE + temporal
table functions:

```
SELECT
    D1.col1 AS A,
    D1.col2 AS B,
FROM
    fact_table,
    LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
WHERE
    fact_table.dim1     = D1.id
```

In particular, this produces a job which is equivalent to

```
  private abstract static class AbstractFactDimTableJoin<IN1, OUT>
      extends CoProcessFunction<IN1, Dimension, OUT> {
    private static final long serialVersionUID = 1L;

    protected transient ValueState<Dimension> dimState;

    @Override
    public void processElement1(IN1 value, Context ctx, Collector<OUT> out)
throws Exception {
      Dimension dim = dimState.value();
      if (dim == null) {
        return;
      }
      out.collect(join(value, dim));
    }

    abstract OUT join(IN1 value, Dimension dim);

    @Override
    public void processElement2(Dimension value, Context ctx,
Collector<OUT> out) throws Exception {
      dimState.update(value);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ValueStateDescriptor<Dimension> dimStateDesc =
          new ValueStateDescriptor<>("dimstate", Dimension.class);
      this.dimState = getRuntimeContext().getState(dimStateDesc);
    }
  }
```

I'm basically interested in rewriting these types of DIY joins (based on
CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
possible, otherwise I would like to know which limitations there are.

Regards,

Salva

On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
wrote:

> By looking at the docs for older versions of Flink, e.g.,
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>
> it seems that it's possible to rewrite this query
>
> ```
> SELECT
>   o.amount * r.rate AS amount
> FROM
>   Orders AS o,
>   LATERAL TABLE (Rates(o.rowtime)) AS r
> WHERE r.currency = o.currency
> ```
>
> as
>
> ```
> SELECT
>   SUM(o.amount * r.rate) AS amount
> FROM Orders AS o,
>   RatesHistory AS r
> WHERE r.currency = o.currency
> AND r.rowtime = (
>   SELECT MAX(rowtime)
>   FROM RatesHistory AS r2
>   WHERE r2.currency = o.currency
>   AND r2.rowtime <= o.rowtime);
> ```
>
> This would be a way to accomplish this task in SQL without using a
> temporal table function.
>
> Would this rewrite be equivalent in terms of the final generated job?
> Obviously I very much prefer the LATERAL TABLE query but this requires
> using a temporal table function which can only be registered using the
> Table API (apparently).
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> It doesn't seem the case with processing time unless I'm mistaken:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>
>> This case seems to require a different syntax based on LATERAL TABLE and
>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>> too, it seems that temporal table functions can only be registered via the
>> table API. Am I missing/misunderstanding something?
>>
>> Salva
>>
>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
>> wrote:
>>
>>> Hi Salva,
>>>
>>> The examples for temporal table joins can be found at
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>> Your example is definitely possible with just using SQL.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <sa...@gmail.com>
>>> wrote:
>>>
>>>> Based on this:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>
>>>> It seems that the only way of registering temporal table functions is
>>>> via the Table API.
>>>>
>>>> If that is the case, is there a way to make this example work
>>>>
>>>> ```
>>>> SELECT
>>>>   SUM(amount * rate) AS amount
>>>> FROM
>>>>   orders,
>>>>   LATERAL TABLE (rates(order_time))
>>>> WHERE
>>>>   rates.currency = orders.currency
>>>> ```
>>>>
>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>> the temporal table function to the cluster (by packaging it in a jar file)
>>>> and then run the above query from the Flink SQL CLI?
>>>>
>>>> Thanks in advance,
>>>>
>>>> Salva
>>>>
>>>>

Re: Can temporal table functions only be registered using the table API?

Posted by Salva Alcántara <sa...@gmail.com>.
By looking at the docs for older versions of Flink, e.g.,

https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html

it seems that it's possible to rewrite this query

```
SELECT
  o.amount * r.rate AS amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
```

as

```
SELECT
  SUM(o.amount * r.rate) AS amount
FROM Orders AS o,
  RatesHistory AS r
WHERE r.currency = o.currency
AND r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = o.currency
  AND r2.rowtime <= o.rowtime);
```

This would be a way to accomplish this task in SQL without using a temporal
table function.

Would this rewrite be equivalent in terms of the final generated job?
Obviously I very much prefer the LATERAL TABLE query but this requires
using a temporal table function which can only be registered using the
Table API (apparently).

Regards,

Salva

On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
wrote:

> It doesn't seem the case with processing time unless I'm mistaken:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>
> This case seems to require a different syntax based on LATERAL TABLE and a
> temporal table function (FOR SYSTEM_TIME is not supported). From the docs
> too, it seems that temporal table functions can only be registered via the
> table API. Am I missing/misunderstanding something?
>
> Salva
>
> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi Salva,
>>
>> The examples for temporal table joins can be found at
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>> Your example is definitely possible with just using SQL.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <sa...@gmail.com>
>> wrote:
>>
>>> Based on this:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>
>>> It seems that the only way of registering temporal table functions is
>>> via the Table API.
>>>
>>> If that is the case, is there a way to make this example work
>>>
>>> ```
>>> SELECT
>>>   SUM(amount * rate) AS amount
>>> FROM
>>>   orders,
>>>   LATERAL TABLE (rates(order_time))
>>> WHERE
>>>   rates.currency = orders.currency
>>> ```
>>>
>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>> the temporal table function to the cluster (by packaging it in a jar file)
>>> and then run the above query from the Flink SQL CLI?
>>>
>>> Thanks in advance,
>>>
>>> Salva
>>>
>>>

Re: Can temporal table functions only be registered using the table API?

Posted by Salva Alcántara <sa...@gmail.com>.
It doesn't seem the case with processing time unless I'm mistaken:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

This case seems to require a different syntax based on LATERAL TABLE and a
temporal table function (FOR SYSTEM_TIME is not supported). From the docs
too, it seems that temporal table functions can only be registered via the
table API. Am I missing/misunderstanding something?

Salva

On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org> wrote:

> Hi Salva,
>
> The examples for temporal table joins can be found at
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
> Your example is definitely possible with just using SQL.
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <sa...@gmail.com>
> wrote:
>
>> Based on this:
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>
>> It seems that the only way of registering temporal table functions is via
>> the Table API.
>>
>> If that is the case, is there a way to make this example work
>>
>> ```
>> SELECT
>>   SUM(amount * rate) AS amount
>> FROM
>>   orders,
>>   LATERAL TABLE (rates(order_time))
>> WHERE
>>   rates.currency = orders.currency
>> ```
>>
>> without the Table API, just using SQL? E.g., is it possible to deploy the
>> temporal table function to the cluster (by packaging it in a jar file) and
>> then run the above query from the Flink SQL CLI?
>>
>> Thanks in advance,
>>
>> Salva
>>
>>

Re: Can temporal table functions only be registered using the table API?

Posted by Martijn Visser <ma...@apache.org>.
Hi Salva,

The examples for temporal table joins can be found at
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
Your example is definitely possible with just using SQL.

Best regards,

Martijn

On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <sa...@gmail.com>
wrote:

> Based on this:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>
> It seems that the only way of registering temporal table functions is via
> the Table API.
>
> If that is the case, is there a way to make this example work
>
> ```
> SELECT
>   SUM(amount * rate) AS amount
> FROM
>   orders,
>   LATERAL TABLE (rates(order_time))
> WHERE
>   rates.currency = orders.currency
> ```
>
> without the Table API, just using SQL? E.g., is it possible to deploy the
> temporal table function to the cluster (by packaging it in a jar file) and
> then run the above query from the Flink SQL CLI?
>
> Thanks in advance,
>
> Salva
>
>