You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/02 13:23:04 UTC

[jira] [Commented] (FLINK-5884) Integrate time indicators for Table API & SQL

    [ https://issues.apache.org/jira/browse/FLINK-5884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992874#comment-15992874 ] 

ASF GitHub Bot commented on FLINK-5884:
---------------------------------------

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/3808

    [FLINK-5884] [table] Integrate time indicators for Table API & SQL

    This PR introduces time indicators in the Table & SQL API. The PR splits the current row type into a logical and physical row type. Time indicators are part of every logical row and stay logical if there are just forwarded and not accessed for calculation. If a time attribute is passed to a function or arithmetic expression it will be materialized. A ProcessFunction can access the meta timestamp for materialization (code generation for this has to be implemented). Every logical plan includes time attributes now and conistent:
    
    ```
    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
    
    val windowedTable = table
      .window(Session withGap 7.milli on 'long as 'w)
      .groupBy('w, 'string)
      .select('string, 'int.count)
    
    val expected = unaryNode(
      "DataStreamAggregate",
      streamTableNode(0),
      term("groupBy", "string"),
      term(
        "window",
        SessionGroupWindow(
          WindowReference("w"),
          'long,
          7.milli)),
      term("select", "string", "COUNT(int) AS TMP_0")
    )
    ```
    
    
    
    Main changes of the current solution:
    
    - Processing and rowtime time indicators can be named arbitrarily
    - They can be defined as follows: stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) or stream.toTable(tEnv, 'long.rowtime, 'int, 'string)
    - In a streaming environment: if the "long" field is already defined in the record, it will not be read by the runtime. "long" always represents the timestamp of the row.
    - In batch environment: "long" must be present in the record and will be read by the runtime.
    - The table definition looks equivalent in both batch and streaming (better unification than current state)
    - Internally row types are split up in a logical and a physical row type.
    - The logical row type contains time indicators, the physical rowtime never contains time indicators (the pure "long" will never be in a record)
    - After validation and query decorrelation, a special time indicator converter traverses the RelNodes and analyzes if the a time indicator is accessed or only forwarded.
    - An access to a time indicator means that we need to materialize the rowtime using a ProcessFunction (not yet implemented). The timestamp (not an indicator anymore) becomes part of the physical row. E.g. long.cast(STRING) would require a materialization
    - Forwarding of time indicators does not materialize the rowtime. It remains a logical attribute. E.g. .select('long)
    - Windows are only valid if they work on time indicators.
    - A new `RowSchema` unifies logical and physical rows and tries to make type handling throughout the classes easier and consistent (it is not used everywhere yet).
    
    Missing so far:
    - Java API tests for defining time attributes
    - Tests for TableSources that define time attributes
    - Multiwindow tests
    - ProcessFunction for accessing the timestamp
    - More tests in general

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-5884

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3808
    
----
commit 5550a6026614ef61a62e98654ee2cd2d45b6d64e
Author: twalthr <tw...@apache.org>
Date:   2017-03-02T15:06:55Z

    [FLINK-5884] [table] Integrate time indicators for Table API & SQL

----


> Integrate time indicators for Table API & SQL
> ---------------------------------------------
>
>                 Key: FLINK-5884
>                 URL: https://issues.apache.org/jira/browse/FLINK-5884
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>
> We already discussed the need for a proper integration of time indicators (event-time or processing-time) for both the Table API & SQL on the ML:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-SQL-indicators-for-event-and-processing-time-tp15927.html
> This issue will track the progress. I will work on a design document how we can solve this issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)