You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com.INVALID> on 2021/07/22 12:44:25 UTC

flink 1.13.1 再次查询row(a, b)生成的列时报错

hi!
1. 我在使用flink 1.13.1 对通过row(a,b)生成的列再次查询时,发生错误,是否是一个bug?
2. 通过 row函数生成row类型的列时,无法指定row中字段的name,是否考虑支持name的配置?
我的示例程序如下:


package test;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ResolvedSchema;


public class DataGenTest {


&nbsp; &nbsp; public static void main(String[] args) {
&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);


&nbsp; &nbsp; &nbsp; &nbsp; tableEnvironment.executeSql("CREATE TABLE datagen (\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " f_sequence INT,\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " f_random INT,\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " f_random_str STRING,\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " ts AS localtimestamp,\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " WATERMARK FOR ts AS ts\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'connector' = 'datagen',\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'rows-per-second'='5',\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'fields.f_sequence.kind'='sequence',\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'fields.f_sequence.start'='1',\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'fields.f_sequence.end'='1000',\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'fields.f_random.min'='1',\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'fields.f_random.max'='1000',\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'fields.f_random_str.length'='10'\n" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ")");


&nbsp; &nbsp; &nbsp; &nbsp; Table table = tableEnvironment.sqlQuery("select row(f_sequence, f_random) as c from datagen");
&nbsp; &nbsp; &nbsp; &nbsp; ResolvedSchema resolvedSchema = table.getResolvedSchema();
&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(resolvedSchema);
&nbsp; &nbsp; &nbsp; &nbsp; /**
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* 打印如下:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* (
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp;`c` ROW<`EXPR$0` INT, `EXPR$1` INT&gt; NOT NULL
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* )
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* 问题一,通过使用row函数,我将两个列放入row类型中,那我如何配置row中字段的名称呢?,如下中的c1, c2:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* (
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp;`c` ROW<`c1` INT, `c2` INT&gt; NOT NULL
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* )
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/


&nbsp; &nbsp; &nbsp; &nbsp; Table table1 = tableEnvironment.sqlQuery("select * from " + table);
&nbsp; &nbsp; &nbsp; &nbsp; /**
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* 问题二,查询sql报错:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* validated type:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL c) NOT NULL
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* converted type:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* RecordType(RecordType(INTEGER EXPR$0, INTEGER EXPR$1) NOT NULL c) NOT NULL
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* rel:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* LogicalProject(c=[ROW($0, $1)])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp;LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp;LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2], ts=[LOCALTIMESTAMP])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp; &nbsp;LogicalTableScan(table=[[default_catalog, default_database, datagen]])
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/
&nbsp; &nbsp; &nbsp; &nbsp; ResolvedSchema resolvedSchema1 = table1.getResolvedSchema();
&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(resolvedSchema1);


&nbsp; &nbsp; &nbsp; &nbsp; table.execute().print();




&nbsp; &nbsp; }


}

Re: flink 1.13.1 再次查询row(a, b)生成的列时报错

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

关于问题一(如何配置 row 的字段名),可以通过 cast 语句:
select cast(row(f_sequence, f_random) as row<a int, b int>) as c from
datagen

关于问题二,看起来确实是一个 bug,可以去 https://issues.apache.org/jira/projects/FLINK/issues
上开一个 ticket

Asahi Lee <97...@qq.com.invalid> 于2021年7月22日周四 下午8:44写道:

> hi!
> 1. 我在使用flink 1.13.1 对通过row(a,b)生成的列再次查询时,发生错误,是否是一个bug?
> 2. 通过 row函数生成row类型的列时,无法指定row中字段的name,是否考虑支持name的配置?
> 我的示例程序如下:
>
>
> package test;
>
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.ResolvedSchema;
>
>
> public class DataGenTest {
>
>
> &nbsp; &nbsp; public static void main(String[] args) {
> &nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment
> streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tableEnvironment =
> StreamTableEnvironment.create(streamExecutionEnvironment);
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; tableEnvironment.executeSql("CREATE TABLE
> datagen (\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " f_sequence
> INT,\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " f_random INT,\n"
> +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " f_random_str
> STRING,\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " ts AS
> localtimestamp,\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " WATERMARK FOR ts
> AS ts\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'connector' =
> 'datagen',\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "
> 'rows-per-second'='5',\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "
> 'fields.f_sequence.kind'='sequence',\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "
> 'fields.f_sequence.start'='1',\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "
> 'fields.f_sequence.end'='1000',\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "
> 'fields.f_random.min'='1',\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "
> 'fields.f_random.max'='1000',\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "
> 'fields.f_random_str.length'='10'\n" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ")");
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; Table table =
> tableEnvironment.sqlQuery("select row(f_sequence, f_random) as c from
> datagen");
> &nbsp; &nbsp; &nbsp; &nbsp; ResolvedSchema resolvedSchema =
> table.getResolvedSchema();
> &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(resolvedSchema);
> &nbsp; &nbsp; &nbsp; &nbsp; /**
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* 打印如下:
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* (
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp;`c` ROW<`EXPR$0` INT,
> `EXPR$1` INT&gt; NOT NULL
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* )
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*
> 问题一,通过使用row函数,我将两个列放入row类型中,那我如何配置row中字段的名称呢?,如下中的c1, c2:
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* (
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp;`c` ROW<`c1` INT, `c2`
> INT&gt; NOT NULL
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* )
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; Table table1 =
> tableEnvironment.sqlQuery("select * from " + table);
> &nbsp; &nbsp; &nbsp; &nbsp; /**
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* 问题二,查询sql报错:
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* Exception in thread "main"
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* validated type:
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*
> RecordType(RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER EXPR$1) NOT
> NULL c) NOT NULL
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* converted type:
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* RecordType(RecordType(INTEGER EXPR$0,
> INTEGER EXPR$1) NOT NULL c) NOT NULL
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* rel:
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* LogicalProject(c=[ROW($0, $1)])
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp;
> &nbsp;LogicalWatermarkAssigner(rowtime=[ts], watermark=[$3])
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp;
> &nbsp;LogicalProject(f_sequence=[$0], f_random=[$1], f_random_str=[$2],
> ts=[LOCALTIMESTAMP])
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*&nbsp; &nbsp; &nbsp;
> &nbsp;LogicalTableScan(table=[[default_catalog, default_database, datagen]])
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/
> &nbsp; &nbsp; &nbsp; &nbsp; ResolvedSchema resolvedSchema1 =
> table1.getResolvedSchema();
> &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(resolvedSchema1);
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; table.execute().print();
>
>
>
>
> &nbsp; &nbsp; }
>
>
> }