You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vishalovercome <vi...@moengage.com> on 2021/04/06 19:36:39 UTC

Dynamic configuration via broadcast state

I have to make my flink job dynamically configurable and I'm thinking about
using broadcast state. My current static job configuration file consists of
configuration of entire set of operators which I load into a case class and
then I explicitly pass the relevant configuration of each operator as its
constructor parameters. Will I have to create individual broadcast streams
for each operator? I.e

val o1conf: BroadcastStream[O1Conf] = ...
someStream.connect(o1conf).map(...)

someOtherStream.connect(o2conf).flatMap(...) and so on?

1. Is there a way to just load the configuration as a whole but only pick
the right subset in the connect method like so:

someStream.connect(jobConfig.o1Conf).map(...)

My job has several operators and it seems rather clumsy to have to
instantiate 1 broadcast stream for each dynamically configurable operator.

2. Is there a way to guarantee that processElement isn't called before the
first processBroadcastElement will be called? How else can we ensure that
each operator always starts with valid configuration? Passing the same
configuration as constructor parameters is one way to deal with it but its
clumsy because that's just repetition of code. Loading configuration in open
method is even worse because each operator will now have access to entire
job configuration.

3. What can we do to make source and sink connectors dynamically
configurable?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic configuration via broadcast state

Posted by Arvid Heise <ar...@apache.org>.
Hi Vishal,

what you are trying to achieve is quite common and has its own
documentation [1]. Currently, there is no way to hold back elements of the
non-broadcast side (your question 2 in OP), so you have to save them until
configuration arrives.

If you have several configurable operators, you could try to create a
generic configuration holder and chain the actual operator to it [2] or you
create a base class that does all the work and you just override how the
configuration is applied to all elements.

For sources, you have to implement your own source, for sinks you can use
the same chaining trick.

I currently don't see how you can use watermarks can help. We are still in
process of providing a way to synchronize sources with different timestamps
automatically and it will arrive not before Flink 1.14.

---

If configuration changes are quite rare, there is an easier option for you
that is viable if your state is not huge: you could simply load
configuration statically in `open` and fail on configuration change to
trigger a recovery. That keeps the whole DataStream simple at the cost of
additional recoveries.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Broadcast-tt42807.html

On Wed, Apr 7, 2021 at 12:37 AM vishalovercome <vi...@moengage.com> wrote:

> I researched a bit more and another suggested solution is to build a custom
> source function that somehow waits for each operator to load it's
> configuration which is infact set in the open method of the source itself.
> I'm not sure if that's a good idea as that just exposes entire job
> configuration to an operator.
>
> Can we leverage watermarks/idle sources somehow? Basically set the
> timestamp
> of configuration stream to a very low number at the start and then force it
> to be read before data from other sources start flowing in. As
> configurations aren't going to change frequently we can idle these sources.
>
> 1. Is the above approach even possible?
> 2. Can an idle source resume once configuration changes?
>
> A rough sketch of timestamp assignment, re-activating an idle source would
> help!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Dynamic configuration via broadcast state

Posted by vishalovercome <vi...@moengage.com>.
I researched a bit more and another suggested solution is to build a custom
source function that somehow waits for each operator to load it's
configuration which is infact set in the open method of the source itself.
I'm not sure if that's a good idea as that just exposes entire job
configuration to an operator. 

Can we leverage watermarks/idle sources somehow? Basically set the timestamp
of configuration stream to a very low number at the start and then force it
to be read before data from other sources start flowing in. As
configurations aren't going to change frequently we can idle these sources.

1. Is the above approach even possible?
2. Can an idle source resume once configuration changes? 

A rough sketch of timestamp assignment, re-activating an idle source would
help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/