You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ShenDa (Jira)" <ji...@apache.org> on 2019/12/04 06:06:00 UTC
[jira] [Commented] (FLINK-12584) Add Bucket File Syetem Table Sink
[ https://issues.apache.org/jira/browse/FLINK-12584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16987569#comment-16987569 ]
ShenDa commented on FLINK-12584:
--------------------------------
[~zhangjun] Is there any progress of this issue? Our group also need a bucket fs table sink.
> Add Bucket File Syetem Table Sink
> ---------------------------------
>
> Key: FLINK-12584
> URL: https://issues.apache.org/jira/browse/FLINK-12584
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Affects Versions: 1.8.0, 1.9.0
> Reporter: Jun Zhang
> Assignee: Jun Zhang
> Priority: Major
> Labels: pull-request-available
> Time Spent: 20m
> Remaining Estimate: 0h
>
> 1. *Motivation*
> In flink, the file system (especially hdfs) is a very common output, but for users using sql, it does not support directly using sql to write data to the file system, so I want to add a bucket file system table sink, the user can register it to StreamTableEnvironment, so that table api and sql api can directly use the sink to write stream data to filesystem
> 2.*example*
> tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data"))
> .withFormat(new Json().deriveSchema())
> .withSchema(new Schema()
> .field("name", Types. STRING ())
> .field("age", Types. INT ())
> .inAppendMode()
> .registerTableSink("myhdfssink");
> tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource");
>
> 3.*Some ideas to achieve this function*
> 1) Add a class Bucket which extends from ConnectorDescriptor, add some properties, such as basePath.
> 2) Add a class BucketValidator which extends from the ConnectorDescriptorValidator and is used to check the bucket descriptor.
> 3) Add a class FileSystemTableSink to implement the StreamTableSink interface. In the emitDataStream method, construct StreamingFileSink for writing data to filesystem according to different properties.
> 4) Add a factory class FileSystemTableSinkFactory to implement the StreamTableSinkFactory interface for constructing FileSystemTableSink
> 5) The parameters of withFormat method is the implementation classes of the FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc later.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)