You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/09/08 06:23:00 UTC

[jira] [Updated] (FLINK-12584) Add Bucket File Syetem Table Sink

     [ https://issues.apache.org/jira/browse/FLINK-12584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated FLINK-12584:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add Bucket File Syetem Table Sink
> ---------------------------------
>
>                 Key: FLINK-12584
>                 URL: https://issues.apache.org/jira/browse/FLINK-12584
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.8.0, 1.9.0
>            Reporter: zhangjun
>            Assignee: zhangjun
>            Priority: Major
>              Labels: pull-request-available
>
> 1. *Motivation*
> In flink, the file system (especially hdfs) is a very common output, but for users using sql, it does not support directly using sql to write data to the file system, so I want to add a bucket file system table sink, the user can register it to StreamTableEnvironment, so that table api and sql api can directly use the sink to write stream data to filesystem
> 2.*example*
> tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data"))
>                    .withFormat(new Json().deriveSchema())
>                    .withSchema(new Schema()
>                           .field("name", Types. STRING ())
>                           .field("age", Types. INT ())
>                    .inAppendMode()
>                    .registerTableSink("myhdfssink");
> tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource");
>  
>  3.*Some ideas to achieve this function*
> 1) Add a class Bucket which extends from ConnectorDescriptor, add some properties, such as basePath.
> 2) Add a class BucketValidator which extends from the ConnectorDescriptorValidator and is used to check the bucket descriptor.
> 3) Add a class FileSystemTableSink to implement the StreamTableSink interface.  In the emitDataStream method, construct StreamingFileSink for writing data to filesystem according to different properties.
> 4) Add a factory class FileSystemTableSinkFactory to implement the StreamTableSinkFactory interface for constructing FileSystemTableSink
> 5) The parameters of withFormat method is the implementation classes of the FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc later.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)