You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Christophe Jolif <cj...@gmail.com> on 2018/03/23 17:20:01 UTC

"dynamic" bucketing sink

Hi all,

I'm using the nice topic pattern feature on the KafkaConsumer to read from
multiple topics, automatically discovering new topics added into the system.

At the end of the processing I'm sinking the result into a Hadoop
Filesystem using a BucketingSink.

All works great until I get the requirement to sink into a different Hadoop
Filesystem based on the input topic.

One way to do this would obviously be to get rid of the topic pattern and
start a (similar) job per topic which would each get its own sink to its
own filesystem. And start new jobs when new topics are added. But that's
far from being ideal. This would lead to the usual issues with Flink and a
dynamic number of jobs (requiring new task slots...) also obviously it
would require some external machinery to know new topics have been added
and create new jobs etc...

What would be the recommended way to have a "dynamic" BucketingSink that
can not only write to several basePath (not too hard I guess) but also
dynamically add new base path when new topics are coming into the system.

Thanks,
-- 
Christophe

Re: "dynamic" bucketing sink

Posted by Christophe Jolif <cj...@gmail.com>.
Thanks Timo & Ashish for your input.

I will definitely have a look at Kite SDK (was not aware of it). Otherwise
I'll try to prototype something and share it with the community through a
JIRA issue.

--
Christophe

On Mon, Mar 26, 2018 at 1:34 PM, ashish pok <as...@yahoo.com> wrote:

> Hi Christophe,
>
> Have you looked at Kite SDK? We do something like this but using Gobblin
> and Kite SDK, which is a parallel pipeline to Flink. It feels like if you
> partition by something logical like topic name, you should be able to sink
> using Kite SDK. Kite allows you good ways to handle further partitoning
> like using timestamp and also schema evolution if you are using AVRO.
>
> -- Ashish
>
> On Mon, Mar 26, 2018 at 4:57 AM, Timo Walther
> <tw...@apache.org> wrote:
> Hi Christophe,
>
> I think this will require more effort. As far as I know there is no such
> "dynamic" feature. Have you looked in to the bucketing sink code? Maybe you
> can adapt it to your needs?
>
> Otherwise it might also make sense to open an issue for it to discuss a
> design for it. Maybe other contributors are interested in this feature as
> well.
>
> Regards,
> Timo
>
> Am 23.03.18 um 18:20 schrieb Christophe Jolif:
>
> Hi all,
>
> I'm using the nice topic pattern feature on the KafkaConsumer to read from
> multiple topics, automatically discovering new topics added into the system.
>
> At the end of the processing I'm sinking the result into a Hadoop
> Filesystem using a BucketingSink.
>
> All works great until I get the requirement to sink into a different
> Hadoop Filesystem based on the input topic.
>
> One way to do this would obviously be to get rid of the topic pattern and
> start a (similar) job per topic which would each get its own sink to its
> own filesystem. And start new jobs when new topics are added. But that's
> far from being ideal. This would lead to the usual issues with Flink and a
> dynamic number of jobs (requiring new task slots...) also obviously it
> would require some external machinery to know new topics have been added
> and create new jobs etc...
>
> What would be the recommended way to have a "dynamic" BucketingSink that
> can not only write to several basePath (not too hard I guess) but also
> dynamically add new base path when new topics are coming into the system.
>
> Thanks,
> --
> Christophe
>
>
>

Re: "dynamic" bucketing sink

Posted by ashish pok <as...@yahoo.com>.
Hi Christophe,
Have you looked at Kite SDK? We do something like this but using Gobblin and Kite SDK, which is a parallel pipeline to Flink. It feels like if you partition by something logical like topic name, you should be able to sink using Kite SDK. Kite allows you good ways to handle further partitoning like using timestamp and also schema evolution if you are using AVRO.
-- Ashish 
 
  On Mon, Mar 26, 2018 at 4:57 AM, Timo Walther<tw...@apache.org> wrote:    Hi Christophe,
 
 I think this will require more effort. As far as I know there is no such "dynamic" feature. Have you looked in to the bucketing sink code? Maybe you can adapt it to your needs?
 
 Otherwise it might also make sense to open an issue for it to discuss a design for it. Maybe other contributors are interested in this feature as well.
 
 Regards,
 Timo
 
 Am 23.03.18 um 18:20 schrieb Christophe Jolif:
  
 Hi all, 
  I'm using the nice topic pattern feature on the KafkaConsumer to read from multiple topics, automatically discovering new topics added into the system. 
  At the end of the processing I'm sinking the result into a Hadoop Filesystem using a BucketingSink. 
  All works great until I get the requirement to sink into a different Hadoop Filesystem based on the input topic. 
  One way to do this would obviously be to get rid of the topic pattern and start a (similar) job per topic which would each get its own sink to its own filesystem. And start new jobs when new topics are added. But that's far from being ideal. This would lead to the usual issues with Flink and a dynamic number of jobs (requiring new task slots...) also obviously it would require some external machinery to know new topics have been added and create new jobs etc... 
  What would be the recommended way to have a "dynamic" BucketingSink that can not only write to several basePath (not too hard I guess) but also dynamically add new base path when new topics are coming into the system.
 
  Thanks, -- 
 Christophe   
 

 
   

Re: "dynamic" bucketing sink

Posted by Timo Walther <tw...@apache.org>.
Hi Christophe,

I think this will require more effort. As far as I know there is no such 
"dynamic" feature. Have you looked in to the bucketing sink code? Maybe 
you can adapt it to your needs?

Otherwise it might also make sense to open an issue for it to discuss a 
design for it. Maybe other contributors are interested in this feature 
as well.

Regards,
Timo

Am 23.03.18 um 18:20 schrieb Christophe Jolif:
> Hi all,
>
> I'm using the nice topic pattern feature on the KafkaConsumer to read 
> from multiple topics, automatically discovering new topics added into 
> the system.
>
> At the end of the processing I'm sinking the result into a Hadoop 
> Filesystem using a BucketingSink.
>
> All works great until I get the requirement to sink into a different 
> Hadoop Filesystem based on the input topic.
>
> One way to do this would obviously be to get rid of the topic pattern 
> and start a (similar) job per topic which would each get its own sink 
> to its own filesystem. And start new jobs when new topics are added. 
> But that's far from being ideal. This would lead to the usual issues 
> with Flink and a dynamic number of jobs (requiring new task slots...) 
> also obviously it would require some external machinery to know new 
> topics have been added and create new jobs etc...
>
> What would be the recommended way to have a "dynamic" BucketingSink 
> that can not only write to several basePath (not too hard I guess) but 
> also dynamically add new base path when new topics are coming into the 
> system.
>
> Thanks,
> -- 
> Christophe