You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Francesco Guardiani (Jira)" <ji...@apache.org> on 2021/09/23 07:40:00 UTC
[jira] [Created] (FLINK-24359) Migrate FileSystem connector to
ResolvedSchema
Francesco Guardiani created FLINK-24359:
-------------------------------------------
Summary: Migrate FileSystem connector to ResolvedSchema
Key: FLINK-24359
URL: https://issues.apache.org/jira/browse/FLINK-24359
Project: Flink
Issue Type: New Feature
Components: Table SQL / Ecosystem
Environment: Flink 1.14-SNAPSHOT
Reporter: Francesco Guardiani
Filesystem connector uses the TableSchema deprecated APIs. This causes issues with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) requires the expressions to be serializable strings (ResolvedExpression#asSerializableString).
For example:
{code:java}
TableDescriptor inputTable = TableDescriptor.forConnector("filesystem")
.schema(
Schema.newBuilder()
.column("character", DataTypes.STRING())
.column("latitude", DataTypes.STRING())
.column("longitude", DataTypes.STRING())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", $("time").minus(lit(2).seconds()))
.build()
)
// Other options
.build();
{code}
When used in a table pipeline, throws the following exception:
{code:java}
Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 2000)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976)
at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
at org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54)
at org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52)
at org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91)
at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145)
{code}
The same table definition using SQL works fine:
{code:java}
CREATE TABLE IF NOT EXISTS LocationEvents (
`character` STRING,
`latitude` STRING,
`longitude` STRING,
`time` TIMESTAMP(3),
WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES
) WITH (
-- Load from filesystem
'connector' = 'filesystem',
--- Other configs
);
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)