You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Benchao Li (Jira)" <ji...@apache.org> on 2020/02/29 08:34:00 UTC

[jira] [Commented] (FLINK-16345) Computed column can not refer time attribute column

    [ https://issues.apache.org/jira/browse/FLINK-16345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048248#comment-17048248 ] 

Benchao Li commented on FLINK-16345:
------------------------------------

[~Leonard Xu] Thanks for the reporting, it's indeed a bug.

In {{CatalogTableSource.toRel}} we applied computed column, and the result type differs with the rowType of {{CatalogTableSource}}.
IMO, before we apply `convertToRexNodes`, we can remove time properties from the rowType, because we'll put add watermark later anyway which will add time attribute.

> Computed column can not refer time attribute column 
> ----------------------------------------------------
>
>                 Key: FLINK-16345
>                 URL: https://issues.apache.org/jira/browse/FLINK-16345
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: Leonard Xu
>            Priority: Major
>
> If a computed column refer a time attribute column, computed column will lose  time attribute and cause validation fail.
> {code:java}
> CREATE TABLE orders (
>   order_id STRING,
>   order_time TIMESTAMP(3),
>   amount DOUBLE,
>   amount_kg as amount * 1000,
>   // can not select computed column standard_ts which from column order_time that used as WATERMARK
>   standard_ts as order_time + INTERVAL '8' HOUR,
>   WATERMARK FOR order_time AS order_time
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.10',
>   'connector.topic' = 'flink_orders',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'json',
>   'format.derive-schema' = 'true'
> );
> {code}
> The query `select amount_kg from orders` runs normally,  
> the` he query `select standard_ts from orders` throws a validation exception message as following:
> {noformat}
> [ERROR] Could not execute SQL statement. Reason:
>  java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
>  validated type:
>  RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIMESTAMP(3) ts) NOT NULL
>  converted type:
>  RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIME ATTRIBUTE(ROWTIME) ts) NOT NULL
>  rel:
>  LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[$3], ts=[$4])
>  LogicalWatermarkAssigner(rowtime=[order_time], watermark=[$1])
>  LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[*($2, 1000)], ts=[+($1, 28800000:INTERVAL HOUR)])
>  LogicalTableScan(table=[[default_catalog, default_database, orders, source: [Kafka010TableSource(order_id, order_time, amount)]]])
>  {noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)