You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kcz <57...@qq.com.INVALID> on 2021/10/13 09:29:09 UTC

flink-1.14.0 sql 写array 错误

因为select出多个sum的值,每一个sum的值都是一个type类型的数据,最后我将它插入到MySQL表里面,MySQL表结构为 (id,type,value),于是想到通过列转行形式来操作。
SQL如下:
CREATE TABLE kafka_table (
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;vin STRING,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;speed DOUBLE,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;brake DOUBLE,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;hard_to DOUBLE,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;distance DOUBLE,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;times TIMESTAMP(3),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;WATERMARK FOR times AS times - INTERVAL '5' SECOND
) WITH (
&nbsp; 'connector' = 'kafka',
&nbsp; 'topic' = 'user_behavior',
&nbsp; 'properties.bootstrap.servers' = 'localhost:9092',
&nbsp; 'properties.group.id' = 'testGroup',
&nbsp; 'scan.startup.mode' = 'latest-offset',
&nbsp; 'format' = 'json'
);




select window_start, window_end,vin,array[row('brakes',sum(if(brake &gt; 3.0451,1,0))),row('hard_tos',sum(if(hard_to &gt; 3.0451,1,0)))]
from TABLE(
&nbsp; &nbsp; TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' MINUTES)) group by window_start, window_end,vin;


报错如下:
Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
converted type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
rel:
LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)])
&nbsp; LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], agg#1=[SUM($4)])
&nbsp; &nbsp; LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], $f3=[IF(&gt;($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(&gt;($3, 3.0451:DECIMAL(5, 4)), 1, 0)])
&nbsp; &nbsp; &nbsp; LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 600000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
&nbsp; &nbsp; &nbsp; &nbsp; LogicalProject(vin=[$0], speed=[$1], brake=[$2], hard_to=[$3], distance=[$4], times=[$5])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL SECOND)])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LogicalTableScan(table=[[default_catalog, default_database, kafka_table]])


	at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
	at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48)
	at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87)
Disconnected from the target VM, address: '127.0.0.1:61710', transport: 'socket'


Process finished with exit code 1

Re: flink-1.14.0 sql 写array 错误

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

这看起来像一个 bug,我已经记了一个 issue [1],可以在那里关注问题进展。

如 issue 中所描述,目前看来如果常量字符串一样长,或者都 cast 成 varchar 可以绕过该问题。可以先这样绕过一下。

[1] https://issues.apache.org/jira/browse/FLINK-24537

kcz <57...@qq.com.invalid> 于2021年10月13日周三 下午5:29写道:

> 因为select出多个sum的值,每一个sum的值都是一个type类型的数据,最后我将它插入到MySQL表里面,MySQL表结构为
> (id,type,value),于是想到通过列转行形式来操作。
> SQL如下:
> CREATE TABLE kafka_table (
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;vin STRING,
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;speed DOUBLE,
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;brake DOUBLE,
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;hard_to DOUBLE,
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;distance DOUBLE,
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;times TIMESTAMP(3),
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;WATERMARK FOR times AS times - INTERVAL
> '5' SECOND
> ) WITH (
> &nbsp; 'connector' = 'kafka',
> &nbsp; 'topic' = 'user_behavior',
> &nbsp; 'properties.bootstrap.servers' = 'localhost:9092',
> &nbsp; 'properties.group.id' = 'testGroup',
> &nbsp; 'scan.startup.mode' = 'latest-offset',
> &nbsp; 'format' = 'json'
> );
>
>
>
>
> select window_start, window_end,vin,array[row('brakes',sum(if(brake &gt;
> 3.0451,1,0))),row('hard_tos',sum(if(hard_to &gt; 3.0451,1,0)))]
> from TABLE(
> &nbsp; &nbsp; TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10'
> MINUTES)) group by window_start, window_end,vin;
>
>
> 报错如下:
> Exception in thread "main" java.lang.AssertionError: Conversion to
> relational algebra failed to preserve datatypes:
> validated type:
> RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL
> window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin,
> RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER
> EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
> converted type:
> RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL
> window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin,
> RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT
> NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
> rel:
> LogicalProject(window_start=[$0], window_end=[$1], vin=[$2],
> EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7)
> CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT
> NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET
> "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)])
> &nbsp; LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)],
> agg#1=[SUM($4)])
> &nbsp; &nbsp; LogicalProject(window_start=[$6], window_end=[$7], vin=[$0],
> $f3=[IF(&gt;($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(&gt;($3,
> 3.0451:DECIMAL(5, 4)), 1, 0)])
> &nbsp; &nbsp; &nbsp; LogicalTableFunctionScan(invocation=[TUMBLE($5,
> DESCRIPTOR($5), 600000:INTERVAL MINUTE)],
> rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake,
> DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3)
> window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
> &nbsp; &nbsp; &nbsp; &nbsp; LogicalProject(vin=[$0], speed=[$1],
> brake=[$2], hard_to=[$3], distance=[$4], times=[$5])
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL
> SECOND)])
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> LogicalTableScan(table=[[default_catalog, default_database, kafka_table]])
>
>
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
>         at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48)
>         at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87)
> Disconnected from the target VM, address: '127.0.0.1:61710', transport:
> 'socket'
>
>
> Process finished with exit code 1