You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kyle Hamlin (JIRA)" <ji...@apache.org> on 2018/01/05 18:54:00 UTC

[jira] [Comment Edited] (FLINK-8380) Dynamic BucketingSink paths based on ingested Kafka topics

    [ https://issues.apache.org/jira/browse/FLINK-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313504#comment-16313504 ] 

Kyle Hamlin edited comment on FLINK-8380 at 1/5/18 6:53 PM:
------------------------------------------------------------

I agree that is the simplest solution. I thought it might be possible at the very least to get topic through to the getBucketPath method. Topic is available when extending KeyedDeserializationSchema in the deserialize method so I thought it might be possible to also get it through to the bucketer. 

Another thought would be to get the message key though to the getBucketPath method and the topic name could be set as a message key.


was (Author: hamlinkn):
I agree that is the simplest solution. I thought it might be possible at the very least to get topic through to the getBucketPath method. Topic is available when extending KeyedDeserializationSchema in the deserialize method so I thought it might be possible to also get it through to the bucketer.

> Dynamic BucketingSink paths based on ingested Kafka topics
> ----------------------------------------------------------
>
>                 Key: FLINK-8380
>                 URL: https://issues.apache.org/jira/browse/FLINK-8380
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Kyle Hamlin
>
> Flink 1.4 released a feature that allows Kafka consumers to dynamically ingest topics based on a regex pattern. If a user wanted to use Flink as a simple (no transformations) but dynamic (auto topic discovery & auto output path generation) data persister they would currently only have half the tools to do so. I believe it would be a beneficial feature to allow users to not only define automatic topic discovery but also a way to dynamically incorporate those topics into a BucketingSink output path. For example:
> If I had three Kafka topics
> {code:java}
> select-topic-1
> ignore-topic-1
> select-topic-2 
> {code}
> And my Kafka consumers regex only selected two topics
> {code:java}
> val consumer = new FlinkKafkaConsumer010[GenericRecord](Pattern.compile("select-.*?"), new MyDeserializer(), props)
> {code}
> Then the selected topics would appended to the beginning of the BucketingSink output path and any Bucketers partitions would follow
> {code:java}
> val sink = new BucketingSink[GenericRecord]("s3://my-bucket/")
> sink.setBucketer(new DateTimeBucketer[GenericRecord]("yyyyMMdd"))
> {code}
> The resulting output paths would be
> {code:java}
> s3://my-bucket/selected-topic1/yyyyMMdd/
> s3://my-bucket/selected-topic2/yyyyMMdd/
> {code}
> As new topics are discovered via the regex pattern (while the app is running) the set of BucketingSink output paths would grow. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)