You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kyle Hamlin <ha...@gmail.com> on 2017/12/31 16:10:48 UTC

Separate checkpoint directories

Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a
neat feature. I would like to use this feature, but I'm wondering how that
impacts the FsStateBackend checkpointing mechanism. Before I would
subscribe to one topic and set a checkpoint path specific to that topic for
example if the Kafka topic name was *foo*:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("s3://checkpoints/*foo*/"))

How does one dynamically set these checkpoint paths? Is it even necessary
to do so, should I have one checkpoint path for all the possible topics the
regex pattern could pick up?

Re: Separate checkpoint directories

Posted by Kyle Hamlin <ha...@gmail.com>.

> On Jan 3, 2018, at 5:51 AM, Stefan Richter <s....@data-artisans.com> wrote:
> 
> Hi,
> 
> first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for each operator instance.
> 
> Best,
> Stefan
> 
>> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <ha...@gmail.com>:
>> 
>> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a neat feature. I would like to use this feature, but I'm wondering how that impacts the FsStateBackend checkpointing mechanism. Before I would subscribe to one topic and set a checkpoint path specific to that topic for example if the Kafka topic name was foo:
>> 
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))
>> 
>> How does one dynamically set these checkpoint paths? Is it even necessary to do so, should I have one checkpoint path for all the possible topics the regex pattern could pick up?
> 

Re: Separate checkpoint directories

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

the state is checkpointed in subdirectories and with unique file names, so having all in one root directory is no problem. This all happens automatically.

As far as I know, there is no implementation that generates output paths for sinks like that. You could open a jira with a feature wish, though.

Best,
Stefan

> Am 03.01.2018 um 16:06 schrieb Kyle Hamlin <ha...@gmail.com>:
> 
> Hi Stefan,
> 
> In the past, I ran four separate Flink apps to sink data from four separate Kafka topics to s3 without any transformations applied. For each Flink app, I would set the checkpoint directory to s3://some-bucket/checkpoints/topic-name. It appears that with Flink 1.4 I can just use a regex to pick up all four topics, and so what you are telling me is that even if my regex picks up 1000 Kafka topics the only checkpoint path I need is s3://some-bucket/checkpoints/ and Flink will take care of the rest?
> 
> Additionally, I was wondering how this concept might extend to sinking the data from this single Flink app that has picked up 1000 Kafka topics to separate s3 paths? For instance:
> 
> s3://some-bucket/data/topic1/
> s3://some-bucket/data/topic2/
> .
> .
> .
> s3://some-bucket/data/topic1000/
> 
> Thanks very much for your help Stefan!
> 
> On Wed, Jan 3, 2018 at 10:51 AM Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for each operator instance.
> 
> Best,
> Stefan
> 
> 
>> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <hamlin.kn@gmail.com <ma...@gmail.com>>:
>> 
>> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a neat feature. I would like to use this feature, but I'm wondering how that impacts the FsStateBackend checkpointing mechanism. Before I would subscribe to one topic and set a checkpoint path specific to that topic for example if the Kafka topic name was foo:
>> 
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))
>> 
>> How does one dynamically set these checkpoint paths? Is it even necessary to do so, should I have one checkpoint path for all the possible topics the regex pattern could pick up?
> 


Re: Separate checkpoint directories

Posted by Kyle Hamlin <ha...@gmail.com>.
Hi Stefan,

In the past, I ran four separate Flink apps to sink data from four separate
Kafka topics to s3 without any transformations applied. For each Flink app,
I would set the checkpoint directory
to s3://some-bucket/checkpoints/topic-name. It appears that with Flink 1.4
I can just use a regex to pick up all four topics, and so what you are
telling me is that even if my regex picks up 1000 Kafka topics the only
checkpoint path I need is s3://some-bucket/checkpoints/ and Flink will take
care of the rest?

Additionally, I was wondering how this concept might extend to sinking the
data from this single Flink app that has picked up 1000 Kafka topics
to separate s3 paths? For instance:

s3://some-bucket/data/topic1/
s3://some-bucket/data/topic2/
.
.
.
s3://some-bucket/data/topic1000/

Thanks very much for your help Stefan!

On Wed, Jan 3, 2018 at 10:51 AM Stefan Richter <s....@data-artisans.com>
wrote:

> Hi,
>
> first, let my ask why you want to have a different checkpoint directory
> per topic? It is perfectly ok to have just a single checkpoint directory,
> so I wonder what the intention is? Flink will already create proper
> subdirectories and filenames and can identify the right checkpoint data for
> each operator instance.
>
> Best,
> Stefan
>
>
> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <ha...@gmail.com>:
>
> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a
> neat feature. I would like to use this feature, but I'm wondering how that
> impacts the FsStateBackend checkpointing mechanism. Before I would
> subscribe to one topic and set a checkpoint path specific to that topic for
> example if the Kafka topic name was *foo*:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(new FsStateBackend("s3://checkpoints/*foo*/"))
>
> How does one dynamically set these checkpoint paths? Is it even necessary
> to do so, should I have one checkpoint path for all the possible topics the
> regex pattern could pick up?
>
>
>

Re: Separate checkpoint directories

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for each operator instance.

Best,
Stefan

> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin <ha...@gmail.com>:
> 
> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a neat feature. I would like to use this feature, but I'm wondering how that impacts the FsStateBackend checkpointing mechanism. Before I would subscribe to one topic and set a checkpoint path specific to that topic for example if the Kafka topic name was foo:
> 
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))
> 
> How does one dynamically set these checkpoint paths? Is it even necessary to do so, should I have one checkpoint path for all the possible topics the regex pattern could pick up?