You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/10/09 20:44:01 UTC

[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource

    [ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16197673#comment-16197673 ] 

Fabian Hueske commented on FLINK-7548:
--------------------------------------

I've created a proposal for a reworked TableSource API that solves several issues. I've pushed a WIP branch to my repository: https://github.com/fhueske/flink/tree/tableWatermarks-extended

The main changes are as follows:
- The {{TableSource}} interface gets a new method {{getTableSchema(): TableSchema}} which returns the schema with all fields of the resulting table. For streaming tables, this includes also rowtime indicators. These fields have the corresponding {{TimeIndicatorTypeInformation}} types. This means, that the {{TypeInformation}} returned by {{getResultType()}} does no longer determine the schema and fields of the table. Hence, we need to perform a mapping from the physical input type ({{getResultType()}} and the logical table schema {{getTableSchema()}}. By default, this mapping is done based on field names, i.e., a table schema field is mapped to the field of the same name in the physical input type. If we cannot find a physical field for a logical field or if the types do not match, the {{TableSource}} is rejected. This default behavior resembles the current behavior. If fields should not be automatically mapped by name, we will allow to specify a manual index based mapping (similar to the previous {{DefinedFieldNames}} interface (note: this is not yet included in the WIP branch).
- Processing time fields (i.e., fields in the table schema with type {{TimeIndicatorTypeInformation(false)}}) are automatically inserted into the schema during the initial conversion.
- A {{StreamTableSource}} with rowtime fields (i.e., fields in the table schema with type {{TimeIndicatorTypeInformation(true)}}) requires an additional interface {{DefinedRowtimeAttributes}}. The interface provides a {{RowtimeAttributeDescriptor}} for each rowtime field (right now only a single rowtime field is supported but we are prepared for more). The {{RowtimeAttributeDescriptor}} provides a {{TimestampExtractor}} which gives a {{RexNode}} expression to extract the timestamp field from the input type. A corresponding expression is code-gen'd into the initial conversion and executed by the table scan operator. Moreover, {{RowtimeAttributeDescriptor}} gives a {{WatermarkStrategy}} which is used to generate watermarks for a rowtime attribute. Watermarks are also generated by the table scan operator.
- We can provided built-in implementations for {{TimestampExtractor}} (right now only {{ExistingField}} to convert a {{Long}} or {{Timestamp}} attributes into a rowtime attribute) and {{WatermarkStrategy}} (right now {{AscendingWatermarks}} and {{BoundedOutOfOrderWatermarks}}). Additional implementations can be added in later PRs.

I think the proposal is a good solution because:
- the table schema is explicitly defined by the {{TableSource}} and not by the API internals as before (handling of time attributes is currently hidden). This will make {{CREATE TABLE}} DDL statements easier.
- timestamp extraction and watermark assignment can be configured by extensible interfaces without changing the API internals. The actual timestamp extraction and watermark generation happen in the table scan operator, so the {{TableSource}} does not have to deal with it.
- Timestamp extraction happens with via {{RexNode}} expressions. So this is very versatile and it should be possible to call UDFs if these had been registered before (need to check this though).
- We can provide built-in implementations for the most common strategies. I think all requirements that we had listed in this issue before, should be solvable with this design.
- Projection push-down can be enabled for table sources with timestamp attributes which doesn't work right now (not part of the WIP branch)
- I could remove a few weird artifacts / workarounds of the current design that changes the tables schema internally.

Of course, there are also a downside:
- We are touching the {{TableSource}} interface and related interfaces. This is not a big deal for Flink's built-in sources (because there are not so many), but might be for users that have more {{TableSource}}s implemented.

Please let me know what you think [~jark], [~wheat9], [~twalthr], [~xccui], [~ykt836], and everybody else.


> Support watermark generation for TableSource
> --------------------------------------------
>
>                 Key: FLINK-7548
>                 URL: https://issues.apache.org/jira/browse/FLINK-7548
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Fabian Hueske
>
> As discussed in FLINK-7446, currently the TableSource only support to define rowtime field, but not support to extract watermarks from the rowtime field. We can provide a new interface called {{DefinedWatermark}}, which has two methods {{getRowtimeAttribute}} (can only be an existing field) and {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked deprecated.
> How to support periodic and punctuated watermarks and support some built-in strategies needs further discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)