You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dezhi Cai (Jira)" <ji...@apache.org> on 2019/11/29 15:05:00 UTC
[jira] [Updated] (FLINK-14994) StreamTableEnvironment.connect throw
exception when using "FileSystem" connector and "CSV" format
[ https://issues.apache.org/jira/browse/FLINK-14994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dezhi Cai updated FLINK-14994:
------------------------------
Attachment: 1.csv
> StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-14994
> URL: https://issues.apache.org/jira/browse/FLINK-14994
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem, Table SQL / API
> Affects Versions: 1.9.0, 1.9.1
> Reporter: Dezhi Cai
> Priority: Blocker
> Attachments: 1.csv, image-2019-11-29-22-50-50-367.png
>
>
> I use two approaches to register table in StreamTableEnvironment. The DDL approach run fine and the "StreamTableEnvironment.connect" one throw exception.
> {color:#FF0000}the root cause :{color}
> "CsvTableSourceFactoryBase.supportedProperties" does't include "format.schema", it cause that "org.apache.flink.table.factories.TableFactoryService#filterBySupportedProperties" return no "TableSourceFactory"
>
> this approach run successfully (using DDL)
> {code:java}
> public static void main1(String[] args) throws Exception{
> StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);
> String sql = "create table test(last_update_dt TIMESTAMP) " +
> "with (" +
> "'connector.type' = 'filesystem'," +
> "'connector.path' = 'C:/work/1.csv'," +
> "'format.type' = 'csv'," +
> "'format.fields.0.name' = 'last_update_dt'," +
> "'format.fields.0.type' = 'TIMESTAMP'" +
> ")";
> streamTableEnvironment.sqlUpdate(sql);
> Table data = streamTableEnvironment.sqlQuery("select * from test");
> streamTableEnvironment.toAppendStream(data, Row.class).print();
> environment.execute();
> }{code}
>
> this approach throw Exception
> {code:java}
> public static void main(String[] args) throws Exception{
> StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);
> streamTableEnvironment.connect(
> new FileSystem()
> .path("c:/work/1.csv")
> ).withFormat(
> new Csv()
> .schema(Types.ROW(Types.SQL_TIMESTAMP))
> ).withSchema(
> new Schema()
> .field("last_update_dt", Types.SQL_TIMESTAMP)
> ).inAppendMode().registerTableSource("test");
> Table data = streamTableEnvironment.sqlQuery("select * from test");
> streamTableEnvironment.toAppendStream(data, Row.class).print();
> environment.execute();
> }
> {code}
> !image-2019-11-29-22-50-50-367.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)