You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2019/11/29 15:55:00 UTC

[jira] [Commented] (FLINK-14994) StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format

    [ https://issues.apache.org/jira/browse/FLINK-14994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16985091#comment-16985091 ] 

Dawid Wysakowicz commented on FLINK-14994:
------------------------------------------

The {{CSV()}} format does not support filesystem connector. You should use {{OldCsv}}: 

{code}

    streamTableEnvironment.connect(
            new FileSystem()
            .path("c:/work/1.csv")
    ).withFormat(
            new OldCsv()
            ...
    ).withSchema(
            new Schema()
            .field("last_update_dt", Types.SQL_TIMESTAMP)
    ).inAppendMode().registerTableSource("test");
{code}

> StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14994
>                 URL: https://issues.apache.org/jira/browse/FLINK-14994
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem, Table SQL / API
>    Affects Versions: 1.9.0, 1.9.1
>            Reporter: Dezhi Cai
>            Priority: Blocker
>              Labels: pull-request-available
>         Attachments: 1.csv, image-2019-11-29-22-50-50-367.png
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I use two approaches to register table in StreamTableEnvironment.  The DDL approach run fine and the "StreamTableEnvironment.connect" one throw exception. 
> {color:#FF0000}the root cause :{color}
> "CsvTableSourceFactoryBase.supportedProperties" does't include "format.schema", it cause that "org.apache.flink.table.factories.TableFactoryService#filterBySupportedProperties" return  no "TableSourceFactory"
>  
> this approach run successfully (using DDL)
> {code:java}
> public static void main1(String[] args) throws Exception{
>     StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
>     EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);
>     String sql = "create table test(last_update_dt TIMESTAMP) " +
>                     "with (" +
>                         "'connector.type' = 'filesystem'," +
>                         "'connector.path' = 'C:/work/1.csv'," +
>                         "'format.type' = 'csv'," +
>                         "'format.fields.0.name' = 'last_update_dt'," +
>                         "'format.fields.0.type' = 'TIMESTAMP'" +
>                     ")";
>     streamTableEnvironment.sqlUpdate(sql);
>     Table data = streamTableEnvironment.sqlQuery("select * from test");
>     streamTableEnvironment.toAppendStream(data, Row.class).print();
>     environment.execute();
> }{code}
>  
> this approach throw Exception
> {code:java}
> public static void main(String[] args) throws Exception{
>     StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
>     EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);
>     streamTableEnvironment.connect(
>             new FileSystem()
>             .path("c:/work/1.csv")
>     ).withFormat(
>             new Csv()
>             .schema(Types.ROW(Types.SQL_TIMESTAMP))
>     ).withSchema(
>             new Schema()
>             .field("last_update_dt", Types.SQL_TIMESTAMP)
>     ).inAppendMode().registerTableSource("test");
>     Table data = streamTableEnvironment.sqlQuery("select * from test");
>     streamTableEnvironment.toAppendStream(data, Row.class).print();
>     environment.execute();
> }
> {code}
>   !image-2019-11-29-22-50-50-367.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)