You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/09/03 11:17:48 UTC

[flink] branch master updated: [FLINK-13355][docs] Add documentation for Temporal Table Join in blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ca2e3e3  [FLINK-13355][docs] Add documentation for Temporal Table Join in blink planner
ca2e3e3 is described below

commit ca2e3e39e99e392514936cd2998dac22e719f19e
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 28 00:11:09 2019 +0800

    [FLINK-13355][docs] Add documentation for Temporal Table Join in blink planner
    
    This closes #9545
---
 docs/dev/table/sourceSinks.md                  |  42 ++++++++
 docs/dev/table/sourceSinks.zh.md               |  42 ++++++++
 docs/dev/table/sql.md                          |  27 ++++-
 docs/dev/table/sql.zh.md                       |  29 +++++-
 docs/dev/table/streaming/joins.md              | 133 ++++++++++++++++++++++++-
 docs/dev/table/streaming/joins.zh.md           | 133 ++++++++++++++++++++++++-
 docs/dev/table/streaming/temporal_tables.md    | 113 ++++++++++++++++++++-
 docs/dev/table/streaming/temporal_tables.zh.md | 113 ++++++++++++++++++++-
 docs/dev/table/tableApi.md                     |  12 +--
 docs/dev/table/tableApi.zh.md                  |  12 +--
 10 files changed, 627 insertions(+), 29 deletions(-)

diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index b27d42d..12b4ca7 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -324,6 +324,48 @@ FilterableTableSource[T] {
 
 {% top %}
 
+### Defining a TableSource for Lookups
+
+<span class="label label-danger">Attention</span> This is an experimental feature. The interface may be changed in future versions. It's only supported in Blink planner.
+
+The `LookupableTableSource` interface adds support for the table to be accessed via key column(s) in a lookup fashion. This is very useful when used to join with a dimension table to enrich some information. If you want to use the `TableSource` in lookup mode, you should use the source in [temporal table join syntax](streaming/joins.html).
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+LookupableTableSource<T> implements TableSource<T> {
+
+  public TableFunction<T> getLookupFunction(String[] lookupkeys);
+
+  public AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupkeys);
+
+  public boolean isAsyncEnabled();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+LookupableTableSource[T] extends TableSource[T] {
+
+  def getLookupFunction(lookupKeys: Array[String]): TableFunction[T]
+
+  def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T]
+
+  def isAsyncEnabled: Boolean
+}
+{% endhighlight %}
+</div>
+</div>
+
+* `getLookupFunction(lookupkeys)`: Returns a `TableFunction` which used to lookup the matched row(s) via lookup keys. The lookupkeys are the field names of `LookupableTableSource` in the join equal conditions. The eval method parameters of the returned `TableFunction`'s should be in the order which `lookupkeys` defined. It is recommended to define the parameters in varargs (e.g. `eval(Object... lookupkeys)` to match all the cases). The return type of the `TableFunction` must be identical [...]
+* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to `getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) asynchronously. The underlying of `AsyncLookupFunction` will be called via [Async I/O](/dev/stream/operators/asyncio.html). The first argument of the eval method of the returned `AsyncTableFunction` should be defined as `java.util.concurrent.CompletableFuture` to collect results asynchronously (e.g. `eval(CompletableFuture<Collection<String>> result,  [...]
+* `isAsyncEnabled()`: Returns true if async lookup is enabled. It requires `getAsyncLookupFunction(lookupkeys)` is implemented if `isAsyncEnabled` returns true.
+
+{% top %}
+
 Define a TableSink
 ------------------
 
diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md
index b34967f..e37e7be 100644
--- a/docs/dev/table/sourceSinks.zh.md
+++ b/docs/dev/table/sourceSinks.zh.md
@@ -324,6 +324,48 @@ FilterableTableSource[T] {
 
 {% top %}
 
+### Defining a TableSource for Lookups
+
+<span class="label label-danger">Attention</span> This is an experimental feature. The interface may be changed in future versions. It's only supported in Blink planner.
+
+The `LookupableTableSource` interface adds support for the table to be accessed via key column(s) in a lookup fashion. This is very useful when used to join with a dimension table to enrich some information. If you want to use the `TableSource` in lookup mode, you should use the source in [temporal table join syntax](streaming/joins.html).
+
+The interface looks as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+LookupableTableSource<T> implements TableSource<T> {
+
+  public TableFunction<T> getLookupFunction(String[] lookupkeys);
+
+  public AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupkeys);
+
+  public boolean isAsyncEnabled();
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+LookupableTableSource[T] extends TableSource[T] {
+
+  def getLookupFunction(lookupKeys: Array[String]): TableFunction[T]
+
+  def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T]
+
+  def isAsyncEnabled: Boolean
+}
+{% endhighlight %}
+</div>
+</div>
+
+* `getLookupFunction(lookupkeys)`: Returns a `TableFunction` which used to lookup the matched row(s) via lookup keys. The lookupkeys are the field names of `LookupableTableSource` in the join equal conditions. The eval method parameters of the returned `TableFunction`'s should be in the order which `lookupkeys` defined. It is recommended to define the parameters in varargs (e.g. `eval(Object... lookupkeys)` to match all the cases). The return type of the `TableFunction` must be identical [...]
+* `getAsyncLookupFunction(lookupkeys)`: Optional. Similar to `getLookupFunction`, but the `AsyncLookupFunction` lookups the matched row(s) asynchronously. The underlying of `AsyncLookupFunction` will be called via [Async I/O](/dev/stream/operators/asyncio.html). The first argument of the eval method of the returned `AsyncTableFunction` should be defined as `java.util.concurrent.CompletableFuture` to collect results asynchronously (e.g. `eval(CompletableFuture<Collection<String>> result,  [...]
+* `isAsyncEnabled()`: Returns true if async lookup is enabled. It requires `getAsyncLookupFunction(lookupkeys)` is implemented if `isAsyncEnabled` returns true.
+
+{% top %}
+
 Define a TableSink
 ------------------
 
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 79f0b41..0cfbdda 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -598,7 +598,7 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
     </tr>
     <tr>
     	<td>
-        <strong>Join with Table Function</strong><br>
+        <strong>Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
     	<td>
@@ -624,7 +624,7 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
     </tr>
     <tr>
       <td>
-        <strong>Join with Temporal Table</strong><br>
+        <strong>Join with Temporal Table Function</strong><br>
         <span class="label label-primary">Streaming</span>
       </td>
       <td>
@@ -647,6 +647,29 @@ WHERE
         <p>For more information please check the more detailed <a href="streaming/temporal_tables.html">temporal tables concept description</a>.</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><a href="streaming/temporal_tables.html">Temporal Tables</a> are tables that track changes over time.
+        A <a href="streaming/temporal_tables.html#temporal-table">Temporal Table</a> provides access to the versions of a temporal table at a specific point in time.</p>
+
+        <p>Only inner and left joins with processing-time temporal tables are supported.</p>
+        <p>The following example assumes that <strong>LatestRates</strong> is a <a href="streaming/temporal_tables.html#temporal-table">Temporal Table</a> which is materialized with the latest rate.</p>
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+        <p>For more information please check the more detailed <a href="streaming/temporal_tables.html">Temporal Tables</a> concept description.</p>
+        <p>Only supported in Blink planner.</p>
+      </td>
+    </tr>
 
   </tbody>
 </table>
diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md
index 676eb1b..dd3869e 100644
--- a/docs/dev/table/sql.zh.md
+++ b/docs/dev/table/sql.zh.md
@@ -622,9 +622,9 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
         <p><b>Note:</b> Currently, only literal <code>TRUE</code> is supported as predicate for a left outer join against a lateral table.</p>
       </td>
     </tr>
-    <tr>
+   <tr>
       <td>
-        <strong>Join with Temporal Table</strong><br>
+        <strong>Join with Temporal Table Function</strong><br>
         <span class="label label-primary">Streaming</span>
       </td>
       <td>
@@ -647,6 +647,29 @@ WHERE
         <p>For more information please check the more detailed <a href="streaming/temporal_tables.html">temporal tables concept description</a>.</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><a href="streaming/temporal_tables.html">Temporal Tables</a> are tables that track changes over time.
+        A <a href="streaming/temporal_tables.html#temporal-table">Temporal Table</a> provides access to the versions of a temporal table at a specific point in time.</p>
+
+        <p>Only inner and left joins with processing-time temporal tables are supported.</p>
+        <p>The following example assumes that <strong>LatestRates</strong> is a <a href="streaming/temporal_tables.html#temporal-table">Temporal Table</a> which is materialized with the latest rate.</p>
+    {% highlight sql %}
+    SELECT
+    o.amout, o.currency, r.rate, o.amount * r.rate
+    FROM
+    Orders AS o
+    JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+    ON r.currency = o.currency
+    {% endhighlight %}
+        <p>For more information please check the more detailed <a href="streaming/temporal_tables.html">Temporal Tables</a> concept description.</p>
+        <p>Only supported in Blink planner.</p>
+      </td>
+    </tr>
 
   </tbody>
 </table>
@@ -796,7 +819,7 @@ ORDER BY orderTime
         <span class="label label-primary">Batch</span>
       </td>
       <td>
-<b>Note:</b> The LIMIT clause requires an ORDER BY clause. 
+<b>Note:</b> The LIMIT clause requires an ORDER BY clause.
 {% highlight sql %}
 SELECT *
 FROM Orders
diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md
index 0e5e0c3..476e121 100644
--- a/docs/dev/table/streaming/joins.md
+++ b/docs/dev/table/streaming/joins.md
@@ -65,10 +65,10 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports append-only tables with time attributes. Since time attributes are quasi-monontic increasing, Flink can remove old values from its state without affecting the correctness of the result.
 
-Join with a Temporal Table
+Join with a Temporal Table Function
 --------------------------
 
-A join with a temporal table joins an append-only table (left input/probe side) with a temporal table (right input/build side),
+A join with a temporal table function joins an append-only table (left input/probe side) with a temporal table (right input/build side),
 i.e., a table that changes over time and tracks its changes. Please check the corresponding page for more information about [temporal tables](temporal_tables.html).
 
 The following example shows an append-only table `Orders` that should be joined with the continuously changing currency rates table `RatesHistory`.
@@ -222,4 +222,133 @@ By definition of event time, [watermarks]({{ site.baseurl }}/dev/event_time.html
 forward in time and discard versions of the build table that are no longer necessary because no incoming row with
 lower or equal timestamp is expected.
 
+Join with a Temporal Table
+--------------------------
+
+A join with a temporal table joins an arbitrary table (left input/probe side) with a temporal table (right input/build side),
+i.e., an external dimension table that changes over time. Please check the corresponding page for more information about [temporal tables](temporal_tables.html#temporal-table).
+
+<span class="label label-danger">Attention</span> Users can not use arbitrary tables as a temporal table, but need to use a table backed by a `LookupableTableSource`. A `LookupableTableSource` can only be used for temporal join as a temporal table. See the page for more details about [how to define LookupableTableSource](../sourceSinks.html#defining-a-tablesource-with-lookupable).
+
+The following example shows an `Orders` stream that should be joined with the continuously changing currency rates table `LatestRates`.
+
+`LatestRates` is a dimension table that is materialized with the latest rate. At time `10:15`, `10:30`, `10:52`, the content of `LatestRates` looks as follows:
+
+{% highlight sql %}
+10:15> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:30> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+
+10:52> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116     <==== changed from 114 to 116
+Yen           1
+{% endhighlight %}
+
+The content of `LastestRates` at time `10:15` and `10:30` are equal. The Euro rate has changed from 114 to 116 at `10:52`.
+
+`Orders` is an append-only table that represents payments for the given `amount` and the given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+amount currency
+====== =========
+     2 Euro             <== arrived at time 10:15
+     1 US Dollar        <== arrived at time 10:30
+     2 Euro             <== arrived at time 10:52
+{% endhighlight %}
+
+Given that we would like to calculate the amount of all `Orders` converted to a common currency (`Yen`).
+
+For example, we would like to convert the following orders using the latest rate in `LatestRates`. The result would be:
+
+{% highlight text %}
+amount currency     rate   amout*rate
+====== ========= ======= ============
+     2 Euro          114          228    <== arrived at time 10:15
+     1 US Dollar     102          102    <== arrived at time 10:30
+     2 Euro          116          232    <== arrived at time 10:52
+{% endhighlight %}
+
+
+With the help of temporal table join, we can express such a query in SQL as:
+
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+
+Each record from the probe side will be joined with the current version of the build side table. In our example, the query is using the processing-time notion, so a newly appended order would always be joined with the most recent version of `LatestRates` when executing the operation. Note that the result is not deterministic for processing-time.
+
+In contrast to [regular joins](#regular-joins), the previous results of the temporal table join will not be affected despite the changes on the build side. Also, the temporal table join operator is very lightweight and does not keep any state.
+
+Compared to [time-windowed joins](#time-windowed-joins), temporal table joins do not define a time window within which the records will be joined.
+Records from the probe side are always joined with the build side's latest version at processing time. Thus, records on the build side might be arbitrarily old.
+
+Both [temporal table function join](#join-with-a-temporal-table-function) and temporal table join come from the same motivation but have different SQL syntax and runtime implementations:
+* The SQL syntax of the temporal table function join is a join UDTF, while the temporal table join uses the standard temporal table syntax introduced in SQL:2011.
+* The implementation of temporal table function joins actually joins two streams and keeps them in state, while temporal table joins just receive the only input stream and look up the external database according to the key in the record.
+* The temporal table function join is usually used to join a changelog stream, while the temporal table join is usually used to join an external table (i.e. dimension table).
+
+Such behaviour makes a temporal table join a good candidate to express stream enrichment in relational terms.
+
+In the future, the temporal table join will support the features of temporal table function joins, i.e. support to temporal join a changelog stream.
+
+### Usage
+
+The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+Currently, only support INNER JOIN and LEFT JOIN. The `FOR SYSTEM_TIME AS OF table1.proctime` should be followed after temporal table. `proctime` is a [processing time attribute](time_attributes.html#processing-time) of `table1`.
+This means that it takes a snapshot of the temporal table at processing time when joining every record from left table.
+
+For example, after [defining temporal table](temporal_tables.html#defining-temporal-table), we can use it as following.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT
+  SUM(o_amount * r_rate) AS amount
+FROM
+  Orders
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
+  ON r_currency = o_currency
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> It is only supported in Blink planner.
+
+<span class="label label-danger">Attention</span> It is only supported in SQL, and not supported in Table API yet.
+
+<span class="label label-danger">Attention</span> Flink does not support event time temporal table joins currently.
+
 {% top %}
diff --git a/docs/dev/table/streaming/joins.zh.md b/docs/dev/table/streaming/joins.zh.md
index 0e5e0c3..476e121 100644
--- a/docs/dev/table/streaming/joins.zh.md
+++ b/docs/dev/table/streaming/joins.zh.md
@@ -65,10 +65,10 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports append-only tables with time attributes. Since time attributes are quasi-monontic increasing, Flink can remove old values from its state without affecting the correctness of the result.
 
-Join with a Temporal Table
+Join with a Temporal Table Function
 --------------------------
 
-A join with a temporal table joins an append-only table (left input/probe side) with a temporal table (right input/build side),
+A join with a temporal table function joins an append-only table (left input/probe side) with a temporal table (right input/build side),
 i.e., a table that changes over time and tracks its changes. Please check the corresponding page for more information about [temporal tables](temporal_tables.html).
 
 The following example shows an append-only table `Orders` that should be joined with the continuously changing currency rates table `RatesHistory`.
@@ -222,4 +222,133 @@ By definition of event time, [watermarks]({{ site.baseurl }}/dev/event_time.html
 forward in time and discard versions of the build table that are no longer necessary because no incoming row with
 lower or equal timestamp is expected.
 
+Join with a Temporal Table
+--------------------------
+
+A join with a temporal table joins an arbitrary table (left input/probe side) with a temporal table (right input/build side),
+i.e., an external dimension table that changes over time. Please check the corresponding page for more information about [temporal tables](temporal_tables.html#temporal-table).
+
+<span class="label label-danger">Attention</span> Users can not use arbitrary tables as a temporal table, but need to use a table backed by a `LookupableTableSource`. A `LookupableTableSource` can only be used for temporal join as a temporal table. See the page for more details about [how to define LookupableTableSource](../sourceSinks.html#defining-a-tablesource-with-lookupable).
+
+The following example shows an `Orders` stream that should be joined with the continuously changing currency rates table `LatestRates`.
+
+`LatestRates` is a dimension table that is materialized with the latest rate. At time `10:15`, `10:30`, `10:52`, the content of `LatestRates` looks as follows:
+
+{% highlight sql %}
+10:15> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:30> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+
+10:52> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116     <==== changed from 114 to 116
+Yen           1
+{% endhighlight %}
+
+The content of `LastestRates` at time `10:15` and `10:30` are equal. The Euro rate has changed from 114 to 116 at `10:52`.
+
+`Orders` is an append-only table that represents payments for the given `amount` and the given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+amount currency
+====== =========
+     2 Euro             <== arrived at time 10:15
+     1 US Dollar        <== arrived at time 10:30
+     2 Euro             <== arrived at time 10:52
+{% endhighlight %}
+
+Given that we would like to calculate the amount of all `Orders` converted to a common currency (`Yen`).
+
+For example, we would like to convert the following orders using the latest rate in `LatestRates`. The result would be:
+
+{% highlight text %}
+amount currency     rate   amout*rate
+====== ========= ======= ============
+     2 Euro          114          228    <== arrived at time 10:15
+     1 US Dollar     102          102    <== arrived at time 10:30
+     2 Euro          116          232    <== arrived at time 10:52
+{% endhighlight %}
+
+
+With the help of temporal table join, we can express such a query in SQL as:
+
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+
+Each record from the probe side will be joined with the current version of the build side table. In our example, the query is using the processing-time notion, so a newly appended order would always be joined with the most recent version of `LatestRates` when executing the operation. Note that the result is not deterministic for processing-time.
+
+In contrast to [regular joins](#regular-joins), the previous results of the temporal table join will not be affected despite the changes on the build side. Also, the temporal table join operator is very lightweight and does not keep any state.
+
+Compared to [time-windowed joins](#time-windowed-joins), temporal table joins do not define a time window within which the records will be joined.
+Records from the probe side are always joined with the build side's latest version at processing time. Thus, records on the build side might be arbitrarily old.
+
+Both [temporal table function join](#join-with-a-temporal-table-function) and temporal table join come from the same motivation but have different SQL syntax and runtime implementations:
+* The SQL syntax of the temporal table function join is a join UDTF, while the temporal table join uses the standard temporal table syntax introduced in SQL:2011.
+* The implementation of temporal table function joins actually joins two streams and keeps them in state, while temporal table joins just receive the only input stream and look up the external database according to the key in the record.
+* The temporal table function join is usually used to join a changelog stream, while the temporal table join is usually used to join an external table (i.e. dimension table).
+
+Such behaviour makes a temporal table join a good candidate to express stream enrichment in relational terms.
+
+In the future, the temporal table join will support the features of temporal table function joins, i.e. support to temporal join a changelog stream.
+
+### Usage
+
+The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+Currently, only support INNER JOIN and LEFT JOIN. The `FOR SYSTEM_TIME AS OF table1.proctime` should be followed after temporal table. `proctime` is a [processing time attribute](time_attributes.html#processing-time) of `table1`.
+This means that it takes a snapshot of the temporal table at processing time when joining every record from left table.
+
+For example, after [defining temporal table](temporal_tables.html#defining-temporal-table), we can use it as following.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT
+  SUM(o_amount * r_rate) AS amount
+FROM
+  Orders
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
+  ON r_currency = o_currency
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> It is only supported in Blink planner.
+
+<span class="label label-danger">Attention</span> It is only supported in SQL, and not supported in Table API yet.
+
+<span class="label label-danger">Attention</span> Flink does not support event time temporal table joins currently.
+
 {% top %}
diff --git a/docs/dev/table/streaming/temporal_tables.md b/docs/dev/table/streaming/temporal_tables.md
index 5d69197..baecbd0 100644
--- a/docs/dev/table/streaming/temporal_tables.md
+++ b/docs/dev/table/streaming/temporal_tables.md
@@ -22,9 +22,13 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Temporal Tables represent a concept of a (parameterized) view on a changing history table that returns the content of a table at a specific point in time.
+Temporal Tables represent a concept of a (parameterized) view on a changing table that returns the content of a table at a specific point in time.
 
-Flink can keep track of the changes applied to an underlying append-only table and allows for accessing the table's content at a certain point in time within a query.
+The changing table can either be a changing history table which tracks the changes (e.g. database changelogs) or a changing dimension table which materializes the changes (e.g. database tables).
+
+For the changing history table, Flink can keep track of the changes and allows for accessing the content of the table at a certain point in time within a query. In Flink, this kind of table is represented by a *Temporal Table Function*.
+
+For the changing dimension table, Flink allows for accessing the content of the table at processing time within a query. In Flink, this kind of table is represented by a *Temporal Table*.
 
 * This will be replaced by the TOC
 {:toc}
@@ -32,6 +36,8 @@ Flink can keep track of the changes applied to an underlying append-only table a
 Motivation
 ----------
 
+### Correlate with a changing history table
+
 Let's assume that we have the following table `RatesHistory`.
 
 {% highlight sql %}
@@ -78,9 +84,38 @@ The concept of *Temporal Tables* aims to simplify such queries, speed up their e
 
 In the above example `currency` would be a primary key for `RatesHistory` table and `rowtime` would be the timestamp attribute.
 
-In Flink, a temporal table is represented by a *Temporal Table Function*.
+In Flink, this is represented by a [*Temporal Table Function*](#temporal-table-function).
+
+### Correlate with a changing dimension table
+
+On the other hand, some use cases require to join a changing dimension table which is an external database table.
+
+Let's assume that `LatestRates` is a table (e.g. stored in) which is materialized with the latest rate. The `LatestRates` is the materialized history `RatesHistory`. Then the content of `LatestRates` table at time `10:58` will be:
+
+{% highlight text %}
+10:58> SELECT * FROM LatestRates;
+currency   rate
+======== ======
+US Dollar   102
+Yen           1
+Euro        116
+{% endhighlight %}
+
+The content of `LatestRates` table at time `12:00` will be:
+
+{% highlight text %}
+12:00> SELECT * FROM LatestRates;
+currency   rate
+======== ======
+US Dollar   102
+Yen           1
+Euro        119
+Pounds      108
+{% endhighlight %}
+
+In Flink, this is represented by a [*Temporal Table*](#temporal-table).
 
-Temporal Table Functions
+Temporal Table Function
 ------------------------
 
 In order to access the data in a temporal table, one must pass a [time attribute](time_attributes.html) that determines the version of the table that will be returned.
@@ -187,4 +222,74 @@ which allows us to use the function `rates` in the [Table API](../tableApi.html#
 Line `(2)` registers this function under the name `Rates` in our table environment,
 which allows us to use the `Rates` function in [SQL](../sql.html#joins).
 
+## Temporal Table
+
+<span class="label label-danger">Attention</span> This is only supported in Blink planner.
+
+In order to access data in temporal table, currently one must define a `TableSource` with `LookupableTableSource`. Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, which is proposed in SQL:2011.
+
+Assuming that we defined a temporal table called `LatestRates`, we can query such a table in the following way:
+
+{% highlight sql %}
+SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116
+Yen           1
+{% endhighlight %}
+
+**Note**: Currently, Flink doesn't support directly querying the temporal table with a constant time. At the moment, temporal table can only be used in joins. The example above is used to provide an intuition about what the temporal table `LatestRates` returns.
+
+See also the page about [joins for continuous queries](joins.html) for more information about how to join with a temporal table.
+
+### Defining Temporal Table
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// Get the stream and table environments.
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+// Create an HBaseTableSource as a temporal table which implements LookableTableSource
+// In the real setup, you should replace this with your own table.
+HBaseTableSource rates = new HBaseTableSource(conf, "Rates");
+rates.setRowKey("currency", String.class);   // currency as the primary key
+rates.addColumn("fam1", "rate", Double.class);
+
+// register the temporal table into environment, then we can query it in sql
+tEnv.registerTableSource("Rates", rates);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Get the stream and table environments.
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+// Create an HBaseTableSource as a temporal table which implements LookableTableSource
+// In the real setup, you should replace this with your own table.
+val rates = new HBaseTableSource(conf, "Rates")
+rates.setRowKey("currency", String.class)   // currency as the primary key
+rates.addColumn("fam1", "rate", Double.class)
+
+// register the temporal table into environment, then we can query it in sql
+tEnv.registerTableSource("Rates", rates)
+{% endhighlight %}
+</div>
+</div>
+
+See also the page about [how to define LookupableTableSource](../sourceSinks.html#defining-a-tablesource-for-lookups).
+
 {% top %}
diff --git a/docs/dev/table/streaming/temporal_tables.zh.md b/docs/dev/table/streaming/temporal_tables.zh.md
index 5d69197..baecbd0 100644
--- a/docs/dev/table/streaming/temporal_tables.zh.md
+++ b/docs/dev/table/streaming/temporal_tables.zh.md
@@ -22,9 +22,13 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Temporal Tables represent a concept of a (parameterized) view on a changing history table that returns the content of a table at a specific point in time.
+Temporal Tables represent a concept of a (parameterized) view on a changing table that returns the content of a table at a specific point in time.
 
-Flink can keep track of the changes applied to an underlying append-only table and allows for accessing the table's content at a certain point in time within a query.
+The changing table can either be a changing history table which tracks the changes (e.g. database changelogs) or a changing dimension table which materializes the changes (e.g. database tables).
+
+For the changing history table, Flink can keep track of the changes and allows for accessing the content of the table at a certain point in time within a query. In Flink, this kind of table is represented by a *Temporal Table Function*.
+
+For the changing dimension table, Flink allows for accessing the content of the table at processing time within a query. In Flink, this kind of table is represented by a *Temporal Table*.
 
 * This will be replaced by the TOC
 {:toc}
@@ -32,6 +36,8 @@ Flink can keep track of the changes applied to an underlying append-only table a
 Motivation
 ----------
 
+### Correlate with a changing history table
+
 Let's assume that we have the following table `RatesHistory`.
 
 {% highlight sql %}
@@ -78,9 +84,38 @@ The concept of *Temporal Tables* aims to simplify such queries, speed up their e
 
 In the above example `currency` would be a primary key for `RatesHistory` table and `rowtime` would be the timestamp attribute.
 
-In Flink, a temporal table is represented by a *Temporal Table Function*.
+In Flink, this is represented by a [*Temporal Table Function*](#temporal-table-function).
+
+### Correlate with a changing dimension table
+
+On the other hand, some use cases require to join a changing dimension table which is an external database table.
+
+Let's assume that `LatestRates` is a table (e.g. stored in) which is materialized with the latest rate. The `LatestRates` is the materialized history `RatesHistory`. Then the content of `LatestRates` table at time `10:58` will be:
+
+{% highlight text %}
+10:58> SELECT * FROM LatestRates;
+currency   rate
+======== ======
+US Dollar   102
+Yen           1
+Euro        116
+{% endhighlight %}
+
+The content of `LatestRates` table at time `12:00` will be:
+
+{% highlight text %}
+12:00> SELECT * FROM LatestRates;
+currency   rate
+======== ======
+US Dollar   102
+Yen           1
+Euro        119
+Pounds      108
+{% endhighlight %}
+
+In Flink, this is represented by a [*Temporal Table*](#temporal-table).
 
-Temporal Table Functions
+Temporal Table Function
 ------------------------
 
 In order to access the data in a temporal table, one must pass a [time attribute](time_attributes.html) that determines the version of the table that will be returned.
@@ -187,4 +222,74 @@ which allows us to use the function `rates` in the [Table API](../tableApi.html#
 Line `(2)` registers this function under the name `Rates` in our table environment,
 which allows us to use the `Rates` function in [SQL](../sql.html#joins).
 
+## Temporal Table
+
+<span class="label label-danger">Attention</span> This is only supported in Blink planner.
+
+In order to access data in temporal table, currently one must define a `TableSource` with `LookupableTableSource`. Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, which is proposed in SQL:2011.
+
+Assuming that we defined a temporal table called `LatestRates`, we can query such a table in the following way:
+
+{% highlight sql %}
+SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116
+Yen           1
+{% endhighlight %}
+
+**Note**: Currently, Flink doesn't support directly querying the temporal table with a constant time. At the moment, temporal table can only be used in joins. The example above is used to provide an intuition about what the temporal table `LatestRates` returns.
+
+See also the page about [joins for continuous queries](joins.html) for more information about how to join with a temporal table.
+
+### Defining Temporal Table
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// Get the stream and table environments.
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+// Create an HBaseTableSource as a temporal table which implements LookableTableSource
+// In the real setup, you should replace this with your own table.
+HBaseTableSource rates = new HBaseTableSource(conf, "Rates");
+rates.setRowKey("currency", String.class);   // currency as the primary key
+rates.addColumn("fam1", "rate", Double.class);
+
+// register the temporal table into environment, then we can query it in sql
+tEnv.registerTableSource("Rates", rates);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// Get the stream and table environments.
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+// Create an HBaseTableSource as a temporal table which implements LookableTableSource
+// In the real setup, you should replace this with your own table.
+val rates = new HBaseTableSource(conf, "Rates")
+rates.setRowKey("currency", String.class)   // currency as the primary key
+rates.addColumn("fam1", "rate", Double.class)
+
+// register the temporal table into environment, then we can query it in sql
+tEnv.registerTableSource("Rates", rates)
+{% endhighlight %}
+</div>
+</div>
+
+See also the page about [how to define LookupableTableSource](../sourceSinks.html#defining-a-tablesource-for-lookups).
+
 {% top %}
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 36a9942..c677de6 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1061,7 +1061,7 @@ Table result = left.join(right)
     </tr>
     <tr>
     	<td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
     	<td>
@@ -1082,7 +1082,7 @@ Table result = orders
     </tr>
     <tr>
     	<td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
@@ -1209,7 +1209,7 @@ val result = left.join(right)
     </tr>
     <tr>
     	<td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
     	<td>
@@ -1228,7 +1228,7 @@ val result: Table = table
     </tr>
     <tr>
     	<td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
     	<td>
         <p>Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.</p>
@@ -1332,7 +1332,7 @@ full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
     </tr>
     <tr>
     	<td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
     	<td>
@@ -1350,7 +1350,7 @@ result = orders.join_lateral("split(c).as(s, t, v)").select("a, b, s, t, v")
     </tr>
     <tr>
     	<td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index d99523b..c8cf4d5 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -1060,7 +1060,7 @@ Table result = left.join(right)
     </tr>
     <tr>
     	<td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
     	<td>
@@ -1081,7 +1081,7 @@ Table result = orders
     </tr>
     <tr>
     	<td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
@@ -1208,7 +1208,7 @@ val result = left.join(right)
     </tr>
     <tr>
     	<td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
     	<td>
@@ -1227,7 +1227,7 @@ val result: Table = table
     </tr>
     <tr>
     	<td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
     	<td>
         <p>Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.</p>
@@ -1331,7 +1331,7 @@ full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
     </tr>
     <tr>
     	<td>
-        <strong>Inner Join with Table Function</strong><br>
+        <strong>Inner Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
       </td>
     	<td>
@@ -1349,7 +1349,7 @@ result = orders.join_lateral("split(c).as(s, t, v)").select("a, b, s, t, v")
     </tr>
     <tr>
     	<td>
-        <strong>Left Outer Join with Table Function</strong><br>
+        <strong>Left Outer Join with Table Function (UDTF)</strong><br>
         <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
       </td>
       <td>