You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Night_xing <10...@qq.com> on 2020/04/10 08:13:37 UTC

Flink SQL文件连接器中proctime()创建表失败的问题

Flink版本:1.10.0

使用了BlinkPlanner之后,使用java代码创建本地的CSV表时,不支持proctime的配置
创建代码如下:

&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.connect(new FileSystem()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .path("file:///Users/test/csv/demo.csv")
&nbsp; &nbsp; &nbsp; &nbsp; )
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withFormat(new Csv())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withSchema(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Schema()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("id", DataTypes.STRING())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("name", DataTypes.STRING())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("user_action_time", DataTypes.TIMESTAMP(3)).proctime()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .registerTableSource("csv_table");

异常信息如下:
Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Unsupported property keys:
schema.#.proctime

如果去掉proctime的设置就没有问题。使用DDL创建则没有任何问题:
&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.sqlUpdate("create table csv_table(" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "id string," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "name string," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "user_action_time as PROCTIME()" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") with (" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'connector.type' = 'filesystem'," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'connector.path' = 'file:///Users/test/csv/demo.csv'," +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; " 'format.type' = 'csv'" +
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ")");