You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Francesco Guardiani (Jira)" <ji...@apache.org> on 2021/09/23 07:40:00 UTC

[jira] [Created] (FLINK-24359) Migrate FileSystem connector to ResolvedSchema

Francesco Guardiani created FLINK-24359:
-------------------------------------------

             Summary: Migrate FileSystem connector to ResolvedSchema
                 Key: FLINK-24359
                 URL: https://issues.apache.org/jira/browse/FLINK-24359
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / Ecosystem
         Environment: Flink 1.14-SNAPSHOT
            Reporter: Francesco Guardiani


Filesystem connector uses the TableSchema deprecated APIs. This causes issues with Table APIs, because TableSchema#fromResolvedSchema(ResolvedSchema) requires the expressions to be serializable strings (ResolvedExpression#asSerializableString).

For example:
{code:java}
TableDescriptor inputTable = TableDescriptor.forConnector("filesystem")
        .schema(
                Schema.newBuilder()
                        .column("character", DataTypes.STRING())
                        .column("latitude", DataTypes.STRING())
                        .column("longitude", DataTypes.STRING())
                        .column("time", DataTypes.TIMESTAMP(3))
                        .watermark("time", $("time").minus(lit(2).seconds()))
                        .build()
        )
        // Other options
        .build();
{code}

When used in a table pipeline, throws the following exception:


{code:java}
Caused by: org.apache.flink.table.api.TableException: Expression 'minus(time, 2000)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
	at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
	at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
	at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4976)
	at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
	at org.apache.flink.table.catalog.ResolvedCatalogBaseTable.getSchema(ResolvedCatalogBaseTable.java:54)
	at org.apache.flink.table.filesystem.AbstractFileSystemTable.<init>(AbstractFileSystemTable.java:52)
	at org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:91)
	at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:145)
{code}

The same table definition using SQL works fine:
{code:java}
CREATE TABLE IF NOT EXISTS LocationEvents (
    `character` STRING,
    `latitude` STRING,
    `longitude` STRING,
    `time` TIMESTAMP(3),
    WATERMARK FOR `time` AS `time` - INTERVAL '5' MINUTES
) WITH (
    -- Load from filesystem
    'connector' = 'filesystem',
    --- Other configs
);
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)