You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Govindarajan Srinivasaraghavan <go...@gmail.com> on 2016/09/26 02:34:44 UTC

Flink: How to handle external app configuration changes in flink

Hi,

My requirement is to stream millions of records in a day and it has huge
dependency on external configuration parameters. For example, a user can go
and change the required setting anytime in the web application and after
the change is made, the streaming has to happen with the new application
config parameters. These are app level configurations and we also have some
dynamic exclude parameters which each data has to be passed through and
filtered.

I see that flink doesn’t have global state which is shared across all task
managers and subtasks. Having a centralized cache is an option but for each
parameter I would have to read it from cache which will increase the
latency. Please advise on the better approach to handle these kind of
scenarios and how other applications are handling it. Thanks.

Re: Flink: How to handle external app configuration changes in flink

Posted by Govindarajan Srinivasaraghavan <go...@gmail.com>.
Hi Jamie,

Thanks a lot for the response. Appreciate your help.

Regards,
Govind

On Mon, Sep 26, 2016 at 3:26 AM, Jamie Grier <ja...@data-artisans.com>
wrote:

> Hi Govindarajan,
>
> Typically the way people do this is to create a stream of configuration
> changes and consume this like any other stream.  For the specific case of
> filtering for example you may have a data stream and a stream of filters
> that you want to run the data through.  The typically approach in the Flink
> API would then be
>
> val dataStream = env.addSource(dataSource).keyBy("userId")val
> filterStream = env.addSource(filterSource).keyBy("userId")
> val connectedStream = dataStream
>   .connect(filterStream)
>   .flatMap(yourFilterFunction)
>
> ​
> You would maintain your filters as state in your filter function.  Notice
> that in this example both streams are keyed the same way.
>
> If it is not possible to distribute the configuration by key (it really
> depends on your use case) you can instead "broadcast" that state so that
> each instance of yourFilterFunction sees the same configuration messages
> and will end up building the same state.  For example:
>
> val dataStream = env.addSource(dataSource).keyBy("userId")val
> filterStream = env.addSource(filterSource).broadcast()
> val connectedStream = dataStream
>   .connect(filterStream)
>   .flatMap(yourFilterFunction)
>
> ​
> I hope that helps.
>
> -Jamie
>
>
>
>
> On Mon, Sep 26, 2016 at 4:34 AM, Govindarajan Srinivasaraghavan <
> govindraghvan@gmail.com> wrote:
>
> > Hi,
> >
> > My requirement is to stream millions of records in a day and it has huge
> > dependency on external configuration parameters. For example, a user can
> go
> > and change the required setting anytime in the web application and after
> > the change is made, the streaming has to happen with the new application
> > config parameters. These are app level configurations and we also have
> some
> > dynamic exclude parameters which each data has to be passed through and
> > filtered.
> >
> > I see that flink doesn’t have global state which is shared across all
> task
> > managers and subtasks. Having a centralized cache is an option but for
> each
> > parameter I would have to read it from cache which will increase the
> > latency. Please advise on the better approach to handle these kind of
> > scenarios and how other applications are handling it. Thanks.
> >
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>

Re: Flink: How to handle external app configuration changes in flink

Posted by Govindarajan Srinivasaraghavan <go...@gmail.com>.
Hi Jamie,

Thanks a lot for the response. Appreciate your help.

Regards,
Govind

On Mon, Sep 26, 2016 at 3:26 AM, Jamie Grier <ja...@data-artisans.com>
wrote:

> Hi Govindarajan,
>
> Typically the way people do this is to create a stream of configuration
> changes and consume this like any other stream.  For the specific case of
> filtering for example you may have a data stream and a stream of filters
> that you want to run the data through.  The typically approach in the Flink
> API would then be
>
> val dataStream = env.addSource(dataSource).keyBy("userId")val
> filterStream = env.addSource(filterSource).keyBy("userId")
> val connectedStream = dataStream
>   .connect(filterStream)
>   .flatMap(yourFilterFunction)
>
> ​
> You would maintain your filters as state in your filter function.  Notice
> that in this example both streams are keyed the same way.
>
> If it is not possible to distribute the configuration by key (it really
> depends on your use case) you can instead "broadcast" that state so that
> each instance of yourFilterFunction sees the same configuration messages
> and will end up building the same state.  For example:
>
> val dataStream = env.addSource(dataSource).keyBy("userId")val
> filterStream = env.addSource(filterSource).broadcast()
> val connectedStream = dataStream
>   .connect(filterStream)
>   .flatMap(yourFilterFunction)
>
> ​
> I hope that helps.
>
> -Jamie
>
>
>
>
> On Mon, Sep 26, 2016 at 4:34 AM, Govindarajan Srinivasaraghavan <
> govindraghvan@gmail.com> wrote:
>
> > Hi,
> >
> > My requirement is to stream millions of records in a day and it has huge
> > dependency on external configuration parameters. For example, a user can
> go
> > and change the required setting anytime in the web application and after
> > the change is made, the streaming has to happen with the new application
> > config parameters. These are app level configurations and we also have
> some
> > dynamic exclude parameters which each data has to be passed through and
> > filtered.
> >
> > I see that flink doesn’t have global state which is shared across all
> task
> > managers and subtasks. Having a centralized cache is an option but for
> each
> > parameter I would have to read it from cache which will increase the
> > latency. Please advise on the better approach to handle these kind of
> > scenarios and how other applications are handling it. Thanks.
> >
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>

Re: Flink: How to handle external app configuration changes in flink

Posted by Jamie Grier <ja...@data-artisans.com>.
Hi Govindarajan,

Typically the way people do this is to create a stream of configuration
changes and consume this like any other stream.  For the specific case of
filtering for example you may have a data stream and a stream of filters
that you want to run the data through.  The typically approach in the Flink
API would then be

val dataStream = env.addSource(dataSource).keyBy("userId")val
filterStream = env.addSource(filterSource).keyBy("userId")
val connectedStream = dataStream
  .connect(filterStream)
  .flatMap(yourFilterFunction)

​
You would maintain your filters as state in your filter function.  Notice
that in this example both streams are keyed the same way.

If it is not possible to distribute the configuration by key (it really
depends on your use case) you can instead "broadcast" that state so that
each instance of yourFilterFunction sees the same configuration messages
and will end up building the same state.  For example:

val dataStream = env.addSource(dataSource).keyBy("userId")val
filterStream = env.addSource(filterSource).broadcast()
val connectedStream = dataStream
  .connect(filterStream)
  .flatMap(yourFilterFunction)

​
I hope that helps.

-Jamie




On Mon, Sep 26, 2016 at 4:34 AM, Govindarajan Srinivasaraghavan <
govindraghvan@gmail.com> wrote:

> Hi,
>
> My requirement is to stream millions of records in a day and it has huge
> dependency on external configuration parameters. For example, a user can go
> and change the required setting anytime in the web application and after
> the change is made, the streaming has to happen with the new application
> config parameters. These are app level configurations and we also have some
> dynamic exclude parameters which each data has to be passed through and
> filtered.
>
> I see that flink doesn’t have global state which is shared across all task
> managers and subtasks. Having a centralized cache is an option but for each
> parameter I would have to read it from cache which will increase the
> latency. Please advise on the better approach to handle these kind of
> scenarios and how other applications are handling it. Thanks.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: Flink: How to handle external app configuration changes in flink

Posted by Jamie Grier <ja...@data-artisans.com>.
Hi Govindarajan,

Typically the way people do this is to create a stream of configuration
changes and consume this like any other stream.  For the specific case of
filtering for example you may have a data stream and a stream of filters
that you want to run the data through.  The typically approach in the Flink
API would then be

val dataStream = env.addSource(dataSource).keyBy("userId")val
filterStream = env.addSource(filterSource).keyBy("userId")
val connectedStream = dataStream
  .connect(filterStream)
  .flatMap(yourFilterFunction)

​
You would maintain your filters as state in your filter function.  Notice
that in this example both streams are keyed the same way.

If it is not possible to distribute the configuration by key (it really
depends on your use case) you can instead "broadcast" that state so that
each instance of yourFilterFunction sees the same configuration messages
and will end up building the same state.  For example:

val dataStream = env.addSource(dataSource).keyBy("userId")val
filterStream = env.addSource(filterSource).broadcast()
val connectedStream = dataStream
  .connect(filterStream)
  .flatMap(yourFilterFunction)

​
I hope that helps.

-Jamie




On Mon, Sep 26, 2016 at 4:34 AM, Govindarajan Srinivasaraghavan <
govindraghvan@gmail.com> wrote:

> Hi,
>
> My requirement is to stream millions of records in a day and it has huge
> dependency on external configuration parameters. For example, a user can go
> and change the required setting anytime in the web application and after
> the change is made, the streaming has to happen with the new application
> config parameters. These are app level configurations and we also have some
> dynamic exclude parameters which each data has to be passed through and
> filtered.
>
> I see that flink doesn’t have global state which is shared across all task
> managers and subtasks. Having a centralized cache is an option but for each
> parameter I would have to read it from cache which will increase the
> latency. Please advise on the better approach to handle these kind of
> scenarios and how other applications are handling it. Thanks.
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com