You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Asahi Lee <97...@qq.com> on 2020/07/22 16:07:39 UTC

flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

1. 程序
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);


&nbsp; &nbsp; &nbsp; &nbsp; String sourceTableDDL = "CREATE TABLE fs_table (" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; user_id STRING," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; order_amount DOUBLE," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; dt TIMESTAMP(3)," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; pt AS PROCTIME() " +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " ) WITH (" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'connector'='filesystem'," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'format'='csv'" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " )";


&nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql(sourceTableDDL);
&nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql("select * from fs_table").print();
2. csv文件
order.csv
zhangsan,12.34,2020-08-03 12:23:50
lisi,234.67,2020-08-03 12:25:50
wangwu,57.6,2020-08-03 12:25:50
zhaoliu,345,2020-08-03 12:28:50



3. 错误
&nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -&gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -&gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED.
java.io.IOException: Failed to deserialize CSV row.
	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3.
	at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
	at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
	... 5 more

Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

Posted by godfrey he <go...@gmail.com>.
和hive结合下,filesystem是支持流式读取的,可以参考 [1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_streaming.html#streaming-reading

Leonard Xu <xb...@gmail.com> 于2020年7月23日周四 下午10:28写道:

> Hi,
>
> Filesystem connector 支持streaming 写入,streaming 读取
> 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
> >
>
>
> > 在 2020年7月23日,22:05,Asahi Lee <97...@qq.com> 写道:
> >
> > 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢?
> > 还是filesystem只能用于批操作?
> >
> >
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:
>                                                     "user-zh"
>                                                                       <
> xbjtdcq@gmail.com <ma...@gmail.com>&gt;;
> > 发送时间:&nbsp;2020年7月23日(星期四) 上午9:55
> > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org <mailto:
> user-zh@flink.apache.org>&gt;;
> >
> > 主题:&nbsp;Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误
> >
> >
> >
> > Hi, Asahi
> >
> > 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复
> >
> >
> > Best
> > Leonard Xu
> > [1] https://issues.apache.org/jira/browse/FLINK-18665 <
> https://issues.apache.org/jira/browse/FLINK-18665> <
> https://issues.apache.org/jira/browse/FLINK-18665&gt <
> https://issues.apache.org/jira/browse/FLINK-18665&gt>;
> >
> > &gt; 在 2020年7月23日,00:07,Asahi Lee <978466273@qq.com <mailto:
> 978466273@qq.com>&gt; 写道:
> > &gt;
> > &gt; 1. 程序
> > &gt; StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; EnvironmentSettings
> bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; StreamTableEnvironment
> bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> > &gt;
> > &gt;
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; String sourceTableDDL =
> "CREATE TABLE fs_table (" +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; "&amp;nbsp; user_id STRING," +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; "&amp;nbsp; order_amount DOUBLE," +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; "&amp;nbsp; dt TIMESTAMP(3)," +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; "&amp;nbsp; pt AS PROCTIME() " +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; " ) WITH (" +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'connector'='filesystem'," +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'path'='D:\\Program
> Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
> +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'format'='csv'" +
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; " )";
> > &gt;
> > &gt;
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> bsTableEnv.executeSql(sourceTableDDL);
> > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> bsTableEnv.executeSql("select * from fs_table").print();
> > &gt; 2. csv文件
> > &gt; order.csv
> > &gt; zhangsan,12.34,2020-08-03 12:23:50
> > &gt; lisi,234.67,2020-08-03 12:25:50
> > &gt; wangwu,57.6,2020-08-03 12:25:50
> > &gt; zhaoliu,345,2020-08-03 12:28:50
> > &gt;
> > &gt;
> > &gt;
> > &gt; 3. 错误
> > &gt; &amp;nbsp;- Source: FileSystemTableSource(user_id, order_amount,
> dt, pt) -&amp;gt; Calc(select=[user_id, order_amount, dt,
> PROCTIME_MATERIALIZE(()) AS pt]) -&amp;gt; SinkConversionToRow (4/6)
> (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED.
> > &gt; java.io.IOException: Failed to deserialize CSV row.
> > &gt;  at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
> > &gt;  at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
> > &gt;  at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
> > &gt;  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > &gt;  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > &gt;  at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> > &gt; Caused by: java.lang.RuntimeException: Row length mismatch. 4
> fields expected but was 3.
> > &gt;  at
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
> > &gt;  at
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
> > &gt;  at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
> > &gt;  ... 5 more
>
>

Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

Posted by Leonard Xu <xb...@gmail.com>.
Hi,

Filesystem connector 支持streaming 写入,streaming 读取 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的


[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html>


> 在 2020年7月23日,22:05,Asahi Lee <97...@qq.com> 写道:
> 
> 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢?
> 还是filesystem只能用于批操作?
> 
> 
> 
> 
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "user-zh"                                                                                    <xbjtdcq@gmail.com <ma...@gmail.com>&gt;;
> 发送时间:&nbsp;2020年7月23日(星期四) 上午9:55
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org <ma...@flink.apache.org>&gt;;
> 
> 主题:&nbsp;Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误
> 
> 
> 
> Hi, Asahi
> 
> 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复
> 
> 
> Best
> Leonard Xu
> [1] https://issues.apache.org/jira/browse/FLINK-18665 <https://issues.apache.org/jira/browse/FLINK-18665> <https://issues.apache.org/jira/browse/FLINK-18665&gt <https://issues.apache.org/jira/browse/FLINK-18665&gt>;
> 
> &gt; 在 2020年7月23日,00:07,Asahi Lee <978466273@qq.com <ma...@qq.com>&gt; 写道:
> &gt; 
> &gt; 1. 程序
> &gt; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> &gt; 
> &gt; 
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; String sourceTableDDL = "CREATE TABLE fs_table (" +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; user_id STRING," +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; order_amount DOUBLE," +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; dt TIMESTAMP(3)," +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; pt AS PROCTIME() " +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; " ) WITH (" +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'connector'='filesystem'," +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'format'='csv'" +
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; " )";
> &gt; 
> &gt; 
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; bsTableEnv.executeSql(sourceTableDDL);
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; bsTableEnv.executeSql("select * from fs_table").print();
> &gt; 2. csv文件
> &gt; order.csv
> &gt; zhangsan,12.34,2020-08-03 12:23:50
> &gt; lisi,234.67,2020-08-03 12:25:50
> &gt; wangwu,57.6,2020-08-03 12:25:50
> &gt; zhaoliu,345,2020-08-03 12:28:50
> &gt; 
> &gt; 
> &gt; 
> &gt; 3. 错误
> &gt; &amp;nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -&amp;gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -&amp;gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED.
> &gt; java.io.IOException: Failed to deserialize CSV row.
> &gt; 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
> &gt; 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
> &gt; 	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
> &gt; 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> &gt; 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> &gt; 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> &gt; Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3.
> &gt; 	at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
> &gt; 	at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
> &gt; 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
> &gt; 	... 5 more


回复: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

Posted by Asahi Lee <97...@qq.com>.
使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢?
还是filesystem只能用于批操作?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <xbjtdcq@gmail.com&gt;;
发送时间:&nbsp;2020年7月23日(星期四) 上午9:55
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误



Hi, Asahi

这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复


Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18665 <https://issues.apache.org/jira/browse/FLINK-18665&gt;

&gt; 在 2020年7月23日,00:07,Asahi Lee <978466273@qq.com&gt; 写道:
&gt; 
&gt; 1. 程序
&gt; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
&gt; 
&gt; 
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; String sourceTableDDL = "CREATE TABLE fs_table (" +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; user_id STRING," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; order_amount DOUBLE," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; dt TIMESTAMP(3)," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; pt AS PROCTIME() " +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; " ) WITH (" +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'connector'='filesystem'," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "&amp;nbsp; 'format'='csv'" +
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; " )";
&gt; 
&gt; 
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; bsTableEnv.executeSql(sourceTableDDL);
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; bsTableEnv.executeSql("select * from fs_table").print();
&gt; 2. csv文件
&gt; order.csv
&gt; zhangsan,12.34,2020-08-03 12:23:50
&gt; lisi,234.67,2020-08-03 12:25:50
&gt; wangwu,57.6,2020-08-03 12:25:50
&gt; zhaoliu,345,2020-08-03 12:28:50
&gt; 
&gt; 
&gt; 
&gt; 3. 错误
&gt; &amp;nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -&amp;gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -&amp;gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED.
&gt; java.io.IOException: Failed to deserialize CSV row.
&gt; 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
&gt; 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
&gt; 	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
&gt; 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
&gt; 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
&gt; 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
&gt; Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3.
&gt; 	at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
&gt; 	at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
&gt; 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
&gt; 	... 5 more

Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Asahi

这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复


Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18665 <https://issues.apache.org/jira/browse/FLINK-18665>

> 在 2020年7月23日,00:07,Asahi Lee <97...@qq.com> 写道:
> 
> 1. 程序
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> 
> 
> &nbsp; &nbsp; &nbsp; &nbsp; String sourceTableDDL = "CREATE TABLE fs_table (" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; user_id STRING," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; order_amount DOUBLE," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; dt TIMESTAMP(3)," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; pt AS PROCTIME() " +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " ) WITH (" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'connector'='filesystem'," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'path'='D:\\Program Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv'," +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; 'format'='csv'" +
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " )";
> 
> 
> &nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql(sourceTableDDL);
> &nbsp; &nbsp; &nbsp; &nbsp; bsTableEnv.executeSql("select * from fs_table").print();
> 2. csv文件
> order.csv
> zhangsan,12.34,2020-08-03 12:23:50
> lisi,234.67,2020-08-03 12:25:50
> wangwu,57.6,2020-08-03 12:25:50
> zhaoliu,345,2020-08-03 12:28:50
> 
> 
> 
> 3. 错误
> &nbsp;- Source: FileSystemTableSource(user_id, order_amount, dt, pt) -&gt; Calc(select=[user_id, order_amount, dt, PROCTIME_MATERIALIZE(()) AS pt]) -&gt; SinkConversionToRow (4/6) (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED.
> java.io.IOException: Failed to deserialize CSV row.
> 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
> 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
> 	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> Caused by: java.lang.RuntimeException: Row length mismatch. 4 fields expected but was 3.
> 	at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
> 	at org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
> 	at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
> 	... 5 more