You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jingsong Lee (Jira)" <ji...@apache.org> on 2020/05/20 04:03:00 UTC
[jira] [Commented] (FLINK-17189) Table with processing time
attribute can not be read from Hive catalog
[ https://issues.apache.org/jira/browse/FLINK-17189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111731#comment-17111731 ]
Jingsong Lee commented on FLINK-17189:
--------------------------------------
{{TableSourceUtil.getSourceRowType}} should not only adjust rowtime, but also adjust proctime fields. I will create a PR for fixing.
> Table with processing time attribute can not be read from Hive catalog
> ----------------------------------------------------------------------
>
> Key: FLINK-17189
> URL: https://issues.apache.org/jira/browse/FLINK-17189
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Ecosystem, Table SQL / Planner
> Affects Versions: 1.10.1
> Reporter: Timo Walther
> Assignee: Jingsong Lee
> Priority: Blocker
> Fix For: 1.11.0, 1.10.2
>
>
> DDL:
> {code}
> CREATE TABLE PROD_LINEITEM (
> L_ORDERKEY INTEGER,
> L_PARTKEY INTEGER,
> L_SUPPKEY INTEGER,
> L_LINENUMBER INTEGER,
> L_QUANTITY DOUBLE,
> L_EXTENDEDPRICE DOUBLE,
> L_DISCOUNT DOUBLE,
> L_TAX DOUBLE,
> L_CURRENCY STRING,
> L_RETURNFLAG STRING,
> L_LINESTATUS STRING,
> L_ORDERTIME TIMESTAMP(3),
> L_SHIPINSTRUCT STRING,
> L_SHIPMODE STRING,
> L_COMMENT STRING,
> WATERMARK FOR L_ORDERTIME AS L_ORDERTIME - INTERVAL '5' MINUTE,
> L_PROCTIME AS PROCTIME()
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'Lineitem',
> 'connector.properties.zookeeper.connect' = 'not-needed',
> 'connector.properties.bootstrap.servers' = 'kafka:9092',
> 'connector.startup-mode' = 'earliest-offset',
> 'format.type' = 'csv',
> 'format.field-delimiter' = '|'
> );
> {code}
> Query:
> {code}
> SELECT * FROM prod_lineitem;
> {code}
> Result:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
> validated type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIMESTAMP(3) NOT NULL L_PROCTIME) NOT NULL
> converted type:
> RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIME ATTRIBUTE(PROCTIME) NOT NULL L_PROCTIME) NOT NULL
> rel:
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], L_PROCTIME=[$15])
> LogicalWatermarkAssigner(rowtime=[L_ORDERTIME], watermark=[-($11, 300000:INTERVAL MINUTE)])
> LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], L_PROCTIME=[PROCTIME()])
> LogicalTableScan(table=[[hcat, default, prod_lineitem, source: [KafkaTableSource(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_CURRENCY, L_RETURNFLAG, L_LINESTATUS, L_ORDERTIME, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)]]])
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)