You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/12 08:29:47 UTC

[GitHub] asfgit closed pull request #7065: [FLINK-10626] [docs] [table] Add documentation for event-time temporal table joins

asfgit closed pull request #7065: [FLINK-10626] [docs] [table] Add documentation for event-time temporal table joins
URL: https://github.com/apache/flink/pull/7065
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md
index 24c33b9be63..49abc022ee7 100644
--- a/docs/dev/table/streaming/joins.md
+++ b/docs/dev/table/streaming/joins.md
@@ -136,14 +136,14 @@ SELECT
   o.amount * r.rate AS amount
 FROM
   Orders AS o,
-  LATERAL TABLE (Rates(o.proctime)) AS r
+  LATERAL TABLE (Rates(o.rowtime)) AS r
 WHERE r.currency = o.currency
 {% endhighlight %}
 
-Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record. 
+Each record from the probe side will be joined with the version of the build side table at the time of the correlated time attribute of the probe side record.
 In order to support updates (overwrites) of previous values on the build side table, the table must define a primary key.
 
-In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.proctime`. Because the time attribute defines a processing-time notion, a newly appended order is always joined with the most recent version of `Rates` when executing the operation. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example.
+In our example, each record from `Orders` will be joined with the version of `Rates` at time `o.rowtime`. The `currency` field has been defined as the primary key of `Rates` before and is used to connect both tables in our example. If the query were using a processing-time notion, a newly appended order would always be joined with the most recent version of `Rates` when executing the operation. 
 
 In contrast to [regular joins](#regular-joins), this means that if there is a new record on the build side, it will not affect the previous results of the join.
 This again allows Flink to limit the number of elements that must be kept in the state.
@@ -189,14 +189,26 @@ val result = orders
 </div>
 </div>
 
+**Note**: State retention defined in a [query configuration](query_configuration.html) is not yet considered for temporal joins. This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table.
+
 ### Processing-time Temporal Joins
 
 With a processing-time time attribute, it is impossible to pass _past_ time attributes as an argument to the temporal table function.
-By definition, it is always the current timestamp. Thus, processing-time temporal table function invocations will always return the latest known versions of the underlying table
+By definition, it is always the current timestamp. Thus, invocations of a processing-time temporal table function will always return the latest known versions of the underlying table
 and any updates in the underlying history table will also immediately overwrite the current values.
 
 Only the latest versions (with respect to the defined primary key) of the build side records are kept in the state. New updates will have no effect on the previously results emitted/processed records from the probe side.
 
-One can think about processing-time temporal join as a simple `HashMap<K, V>` that stores all of the records from the build side.
+One can think about a processing-time temporal join as a simple `HashMap<K, V>` that stores all of the records from the build side.
 When a new record from the build side has the same key as some previous record, the old value is just simply overwritten.
 Every record from the probe side is always evaluated against the most recent/current state of the `HashMap`.
+
+### Event-time Temporal Joins
+
+With an event-time time attribute (i.e., a rowtime attribute), it is possible to pass _past_ time attributes to the temporal table function. This allows for joining the two tables at a common point in time.
+
+Compared to processing-time temporal joins, the temporal table does not only keep the latest version (with respect to the defined primary key) of the build side records in the state but stores multiple versions of them identified by time.
+
+For example, an incoming row with an event-time timestamp of `12:30:00` that is appened to the probe side table is joined with the version of the build side table at time `12:30:00` according to the [concept of temporal tables](temporal_tables.html). Thus, the incoming row is only joined with rows that have a timestamp lower or equal to `12:30:00` with applied updates according to the primary key until this point in time.
+
+By definition of event time, [watermarks]({{ site.baseurl }}/dev/event_time.html) allow the join operation to move forward in time and discard versions of the build table that are no longer necessary because no incoming row with lower or equal timestamp is expected.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services