You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/01 07:37:00 UTC

[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

    [ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633674#comment-16633674 ] 

ASF GitHub Bot commented on FLINK-9712:
---------------------------------------

twalthr commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r221515643
 
 

 ##########
 File path: docs/dev/table/streaming/temporal_tables.md
 ##########
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+----------
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+======= ====== =========
+10:15        2 Euro
+10:30        1 US Dollar
+10:32       50 Yen
+10:52        3 Euro
+11:04        5 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+======= ======== ======
+09:00   US Dollar   102
+09:00   Euro        114
+09:00   Yen           1
+10:45   Euro        116
+11:15   Euro        119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+======= ====== =========
+10:15        2 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+------------------------
+
+In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+======= ======== ======
+09:00   US Dollar   102
+09:00   Euro        114
+09:00   Yen           1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+======= ======== ======
+09:00   US Dollar   102
+10:45   Euro        116
+09:00   Yen           1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns.
+
+Processing time
+---------------
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List<Tuple2<Long, String>> ordersData = new ArrayList<>();
+ordersData.add(Tuple2.of(2L, "Euro"));
+ordersData.add(Tuple2.of(1L, "US Dollar"));
+ordersData.add(Tuple2.of(50L, "Yen"));
+ordersData.add(Tuple2.of(3L, "Euro"));
+ordersData.add(Tuple2.of(5L, "US Dollar"));
+
+List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
+ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
+ratesHistoryData.add(Tuple2.of("Euro", 114L));
+ratesHistoryData.add(Tuple2.of("Yen", 1L));
+ratesHistoryData.add(Tuple2.of("Euro", 116L));
+ratesHistoryData.add(Tuple2.of("Euro", 119L));
+
+DataStreamSource<Tuple2<Long, String>> ordersStream = env.fromCollection(ordersData);
+Table orders = tEnv.fromDataStream(ordersStream, "o_amount, o_currency, o_proctime.proctime");
+
+DataStreamSource<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
+Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");
+
+tEnv.registerTable("Orders", orders);
+tEnv.registerTable("RatesHistory", ratesHistory);
+
+TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
+tEnv.registerFunction("Rates", rates);                                                              // <==== (2)
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val ordersData = new mutable.MutableList[(Long, String)]
+ordersData.+=((2L, "Euro"))
+ordersData.+=((1L, "US Dollar"))
+ordersData.+=((50L, "Yen"))
+ordersData.+=((3L, "Euro"))
+ordersData.+=((5L, "US Dollar"))
+
+val ratesHistoryData = new mutable.MutableList[(String, Long)]
+ratesHistoryData.+=(("US Dollar", 102L))
+ratesHistoryData.+=(("Euro", 114L))
+ratesHistoryData.+=(("Yen", 1L))
+ratesHistoryData.+=(("Euro", 116L))
+ratesHistoryData.+=(("Euro", 119L))
+
+val orders = env
+  .fromCollection(ordersData)
+  .toTable(tEnv, 'o_amount, 'o_currency, 'o_proctime.proctime)
+val ratesHistory = env
+  .fromCollection(ratesHistoryData)
+  .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
+
+tEnv.registerTable("Orders", orders)
+tEnv.registerTable("RatesHistory", ratesHistory)
+
+val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1)
+tEnv.registerFunction("Rates", rates)                                          // <==== (2)
+{% endhighlight %}
+</div>
+</div>
+
+In the line `(1)` we created a `rates` [Temporal Table Function](#temporal-table-functions).
+This allows us to use `rates` function in Table API.
+Line `(2)` registers this function under `Rates` name in our table environment,
+which allows us to use `Rates` function in SQL.
+
+### Joining with Temporal Table Function
+
+After [defining Temporal Table Function](#defining-temporal-table-function) we can start using it.
+Temporal Table Functions can be used in the same way how normal Table Functions would be used.
+For example to solve our motivating problem of converting currencies from `Orders` table,
+we could:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT
 
 Review comment:
   True, the problem with the current document is that the example for SQL users is more incomplete. But we can fix that once we have support for temporal tables in SQL Client.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> -----------------------------------------------
>
>                 Key: FLINK-9712
>                 URL: https://issues.apache.org/jira/browse/FLINK-9712
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)