You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ShenDa (Jira)" <ji...@apache.org> on 2019/12/04 06:06:00 UTC

[jira] [Commented] (FLINK-12584) Add Bucket File Syetem Table Sink

    [ https://issues.apache.org/jira/browse/FLINK-12584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16987569#comment-16987569 ] 

ShenDa commented on FLINK-12584:
--------------------------------

[~zhangjun] Is there any progress of this issue? Our group also need a bucket fs table sink.

> Add Bucket File Syetem Table Sink
> ---------------------------------
>
>                 Key: FLINK-12584
>                 URL: https://issues.apache.org/jira/browse/FLINK-12584
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.8.0, 1.9.0
>            Reporter: Jun Zhang
>            Assignee: Jun Zhang
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> 1. *Motivation*
> In flink, the file system (especially hdfs) is a very common output, but for users using sql, it does not support directly using sql to write data to the file system, so I want to add a bucket file system table sink, the user can register it to StreamTableEnvironment, so that table api and sql api can directly use the sink to write stream data to filesystem
> 2.*example*
> tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data"))
>                    .withFormat(new Json().deriveSchema())
>                    .withSchema(new Schema()
>                           .field("name", Types. STRING ())
>                           .field("age", Types. INT ())
>                    .inAppendMode()
>                    .registerTableSink("myhdfssink");
> tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource");
>  
>  3.*Some ideas to achieve this function*
> 1) Add a class Bucket which extends from ConnectorDescriptor, add some properties, such as basePath.
> 2) Add a class BucketValidator which extends from the ConnectorDescriptorValidator and is used to check the bucket descriptor.
> 3) Add a class FileSystemTableSink to implement the StreamTableSink interface.  In the emitDataStream method, construct StreamingFileSink for writing data to filesystem according to different properties.
> 4) Add a factory class FileSystemTableSinkFactory to implement the StreamTableSinkFactory interface for constructing FileSystemTableSink
> 5) The parameters of withFormat method is the implementation classes of the FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)