You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/02 13:23:04 UTC
[jira] [Commented] (FLINK-5884) Integrate time indicators for Table
API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992874#comment-15992874 ]
ASF GitHub Bot commented on FLINK-5884:
---------------------------------------
GitHub user twalthr opened a pull request:
https://github.com/apache/flink/pull/3808
[FLINK-5884] [table] Integrate time indicators for Table API & SQL
This PR introduces time indicators in the Table & SQL API. The PR splits the current row type into a logical and physical row type. Time indicators are part of every logical row and stay logical if there are just forwarded and not accessed for calculation. If a time attribute is passed to a function or arithmetic expression it will be materialized. A ProcessFunction can access the meta timestamp for materialization (code generation for this has to be implemented). Every logical plan includes time attributes now and conistent:
```
val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
val windowedTable = table
.window(Session withGap 7.milli on 'long as 'w)
.groupBy('w, 'string)
.select('string, 'int.count)
val expected = unaryNode(
"DataStreamAggregate",
streamTableNode(0),
term("groupBy", "string"),
term(
"window",
SessionGroupWindow(
WindowReference("w"),
'long,
7.milli)),
term("select", "string", "COUNT(int) AS TMP_0")
)
```
Main changes of the current solution:
- Processing and rowtime time indicators can be named arbitrarily
- They can be defined as follows: stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) or stream.toTable(tEnv, 'long.rowtime, 'int, 'string)
- In a streaming environment: if the "long" field is already defined in the record, it will not be read by the runtime. "long" always represents the timestamp of the row.
- In batch environment: "long" must be present in the record and will be read by the runtime.
- The table definition looks equivalent in both batch and streaming (better unification than current state)
- Internally row types are split up in a logical and a physical row type.
- The logical row type contains time indicators, the physical rowtime never contains time indicators (the pure "long" will never be in a record)
- After validation and query decorrelation, a special time indicator converter traverses the RelNodes and analyzes if the a time indicator is accessed or only forwarded.
- An access to a time indicator means that we need to materialize the rowtime using a ProcessFunction (not yet implemented). The timestamp (not an indicator anymore) becomes part of the physical row. E.g. long.cast(STRING) would require a materialization
- Forwarding of time indicators does not materialize the rowtime. It remains a logical attribute. E.g. .select('long)
- Windows are only valid if they work on time indicators.
- A new `RowSchema` unifies logical and physical rows and tries to make type handling throughout the classes easier and consistent (it is not used everywhere yet).
Missing so far:
- Java API tests for defining time attributes
- Tests for TableSources that define time attributes
- Multiwindow tests
- ProcessFunction for accessing the timestamp
- More tests in general
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/twalthr/flink FLINK-5884
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3808.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3808
----
commit 5550a6026614ef61a62e98654ee2cd2d45b6d64e
Author: twalthr <tw...@apache.org>
Date: 2017-03-02T15:06:55Z
[FLINK-5884] [table] Integrate time indicators for Table API & SQL
----
> Integrate time indicators for Table API & SQL
> ---------------------------------------------
>
> Key: FLINK-5884
> URL: https://issues.apache.org/jira/browse/FLINK-5884
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> We already discussed the need for a proper integration of time indicators (event-time or processing-time) for both the Table API & SQL on the ML:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-SQL-indicators-for-event-and-processing-time-tp15927.html
> This issue will track the progress. I will work on a design document how we can solve this issue.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)