You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (Jira)" <ji...@apache.org> on 2021/10/07 14:11:00 UTC

[jira] [Closed] (FLINK-24359) Migrate FileSystem connector to ResolvedSchema

     [ https://issues.apache.org/jira/browse/FLINK-24359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timo Walther closed FLINK-24359.
--------------------------------
    Fix Version/s: 1.15.0
       Resolution: Fixed

Fixed in master: 991dd0466ff28995a22ded0727ef2a1706d9bddc

> Migrate FileSystem connector to ResolvedSchema
> ----------------------------------------------
>
>                 Key: FLINK-24359
>                 URL: https://issues.apache.org/jira/browse/FLINK-24359
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Ecosystem
>         Environment: Flink 1.14-SNAPSHOT
>            Reporter: Francesco Guardiani
>            Assignee: Francesco Guardiani
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> Filesystem connector uses the TableSchema deprecated APIs. This causes issues with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) requires the expressions to be serializable strings (ResolvedExpression#asSerializableString).
> For example:
> {code:java}
> TableDescriptor inputTable = TableDescriptor.forConnector("filesystem")
>         .schema(
>                 Schema.newBuilder()
>                         .column("character", DataTypes.STRING())
>                         .column("latitude", DataTypes.STRING())
>                         .column("longitude", DataTypes.STRING())
>                         .column("time", DataTypes.TIMESTAMP(3))
>                         .watermark("time", $("time").minus(lit(2).seconds()))
>                         .build()
>         )
>         // Other options
>         .build();
> {code}
> When used in a table pipeline, throws the following exception:
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 2000)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
> 	at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
> 	at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
> 	at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976)
> 	at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
> 	at org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54)
> 	at org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52)
> 	at org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91)
> 	at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
> 	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145)
> {code}
> The same table definition using SQL works fine:
> {code:java}
> CREATE TABLE IF NOT EXISTS LocationEvents (
>     `character` STRING,
>     `latitude` STRING,
>     `longitude` STRING,
>     `time` TIMESTAMP(3),
>     WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES
> ) WITH (
>     -- Load from filesystem
>     'connector' = 'filesystem',
>     --- Other configs
> );
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)