You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Miguel Araújo <up...@gmail.com> on 2021/02/21 22:37:49 UTC

[Statefun] Dynamic behavior

Hi everyone,

What is the recommended way of achieving the equivalent of a broadcast in
Flink when using Stateful Functions?

For instance, assume we are implementing something similar to Flink's demo
fraud detection
<https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
in Stateful Functions - how can one dynamically update the application's
logic then?
There was a similar question in this mailing list in the past where it
was recommended
moving the dynamic logic to a remote function
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html>
so
that one could achieve that by deploying a new container. I think that's
not very realistic as updates might happen with a frequency that's not
compatible with that approach (e.g., sticking to the fraud detection
example, updating fraud detection rules every hour is not unusual), nor
should one be deploying a new container when data (not code) changes.

Is there a way of, for example, modifying FunctionProviders
<https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
on the fly?

Thanks,
Miguel

Re: [Statefun] Dynamic behavior

Posted by Miguel Araújo <up...@gmail.com>.
Thanks Seth.
I understood Igal's suggestion. My concern was about maintaining a separate
service (outside flink/statefun) when this control stream might be an
incremental stream as well (think, rules in fraud detection - although this
is not a fraud detection application, but the example is good). I wouldn't
want to implement fault tolerance, checkpointing, HA, etc. myself.
I now see that I wasn't thinking a step ahead - just because it is a
separate service from statefun's point of view, it doesn't mean it can't be
implemented in flink if it turns out to be the most appropriate tool.

Thanks for all suggestions, this was definitely helpful.

Miguel

Seth Wiesman <sj...@gmail.com> escreveu no dia terça, 23/02/2021 à(s)
17:08:

> I don't think there is anything statefun specific here and I would follow
> Igals advice.
>
> Let's say you have a state value called `Behavior` that describes the
> behavior of an instance. There is a default behavior but any given instance
> may have a customized behavior. What I would do is the following.
>
> Create a state in the TransactionManager called `behavior` that stores the
> instance's customized behavior if it exists. When a transaction comes in,
> read the behavior state. If it exists (is not None in the case of Python)
> then use that. If not, then fall back to the default instance.
>
> The default instance can be provided one of several ways depending on the
> specifics of your use case:
>
> 1) hard-coded in the function.
> 2) dynamically loaded via a background thread as a global. so long as that
> default is immutable this is safe
> 3) dynamically loaded via the function instance on first use. stateful
> functions have strong support for making async requests so you could simply
> query the behavior for that instance on first use from a 3rd party service.
>
> Seth
>
>
> On Tue, Feb 23, 2021 at 10:55 AM Miguel Araújo <up...@gmail.com>
> wrote:
>
>> Hi Seth,
>>
>> Thanks for your comment. I've seen that repository in the past and it was
>> really helpful to "validate" that this was the way to go.
>> I think my question is not being addressed there though: how could one
>> add dynamic behavior to your TransactionManager? In this case, state that
>> is available to all TransactionManager instances when they receive a
>> message of type Transaction for the first time.
>>
>> Seth Wiesman <sj...@gmail.com> escreveu no dia terça, 23/02/2021
>> à(s) 16:02:
>>
>>> Hey Miguel,
>>>
>>> What you are describing is exactly what is implemented in this repo. The
>>> TransactionManager function acts as an orchestrator to work with the other
>>> functions. The repo is structured as an exercise but the full solution
>>> exists on the branch `advanced-solution`.
>>>
>>> https://github.com/ververica/flink-statefun-workshop
>>>
>>> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo <up...@gmail.com>
>>> wrote:
>>>
>>>> Another possibility I am considering is handling this in Flink using a
>>>> broadcast and adding all the information needed to the event itself. I'm a
>>>> little concerned about the amount of data that will be serialized and sent
>>>> on every request though, as I'll need to include information about all
>>>> available remote functions, for instance.
>>>>
>>>> Miguel Araújo <up...@gmail.com> escreveu no dia terça, 23/02/2021
>>>> à(s) 09:14:
>>>>
>>>>> Hi Gordon, Igal,
>>>>>
>>>>> Thanks for your replies.
>>>>> PubSub would be a good addition, I have a few scenarios where that
>>>>> would be useful.
>>>>>
>>>>> However, after reading your answers I realized that your proposed
>>>>> solutions (which address the most obvious interpretation of my question) do
>>>>> not necessarily solve my problem. I should have just stated what it was,
>>>>> instead of trying to propose a solution by discussing broadcast...
>>>>>
>>>>> I'm trying to implement an "orchestrator" function which, given an
>>>>> event, will trigger multiple remote function calls, aggregate their results
>>>>> and eventually call yet more functions (based on a provided dependency
>>>>> graph). Hence, this orchestrator function has state per event_id and each
>>>>> function instance is short-lived (a couple seconds at most, ideally
>>>>> sub-second). The question then is not about how to modify a long-running
>>>>> function instance (which PubSub would enable), but rather how to have the
>>>>> dependency graph available to new functions.
>>>>>
>>>>> Given this, Igal's answer seems promising because we have the
>>>>> FunctionProvider instantiating a local variable and passing it down on
>>>>> every instantiation. I'm assuming there is one FunctionProvider per
>>>>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>>>>> data coming from a Flink DataStream, or receiving StateFun messages?
>>>>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>>>>
>>>>> I really appreciate your help.
>>>>>
>>>>> Miguel
>>>>>
>>>>> Igal Shilman <ig...@ververica.com> escreveu no dia segunda, 22/02/2021
>>>>> à(s) 12:09:
>>>>>
>>>>>> Hi Miguel,
>>>>>>
>>>>>> I think that there are a couple of ways to achieve this, and it
>>>>>> really depends on your specific use case, and the trade-offs
>>>>>> that you are willing to accept.
>>>>>>
>>>>>> For example, one way to approach this:
>>>>>> - Suppose you have an external service somewhere that returns a
>>>>>> representation of the logic to be interpreted by
>>>>>> your function at runtime (I think that is the scenario you are
>>>>>> describing)
>>>>>> - Then, you can write a background task (a thread) that periodically
>>>>>> queries that service, and keeps in memory the latest version.
>>>>>> - You can initialize this background task in your FunctionProvider
>>>>>> implementation, or even in your StatefulModule if you wish.
>>>>>> - Then, make sure that your dynamic stateful function has an access
>>>>>> to the latest value fetched by your client (for example via a shared
>>>>>> reference like a j.u.c.AtomicReference)
>>>>>> - Then on receive, you can simply get that reference and re-apply
>>>>>> your rules.
>>>>>>
>>>>>> Take a look at [1] for example (it is not exactly the same, but I
>>>>>> believe that it is close enough)
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>>>>>
>>>>>> Good luck,
>>>>>> Igal.
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <
>>>>>> tzulitai@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> FWIW, there is this JIRA that is tracking a pubsub / broadcast
>>>>>>> messaging primitive in StateFun:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-16319
>>>>>>>
>>>>>>> This is probably what you are looking for. And I do agree, in the
>>>>>>> case that the control stream (which updates the application logic) is high
>>>>>>> volume, redeploying functions may not work well.
>>>>>>>
>>>>>>> I don't think there really is a "recommended" way of doing the
>>>>>>> "broadcast control stream, join with main stream" pattern with StateFun at
>>>>>>> the moment, at least without FLINK-16319.
>>>>>>> On the other hand, it could be possible to use stateful functions to
>>>>>>> implement a pub-sub model in user space for the time being. I've actually
>>>>>>> left some ideas for implementing that in the comments of FLINK-16319.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gordon
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <up...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> What is the recommended way of achieving the equivalent of a
>>>>>>>> broadcast in Flink when using Stateful Functions?
>>>>>>>>
>>>>>>>> For instance, assume we are implementing something similar to Flink's
>>>>>>>> demo fraud detection
>>>>>>>> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
>>>>>>>> in Stateful Functions - how can one dynamically update the application's
>>>>>>>> logic then?
>>>>>>>> There was a similar question in this mailing list in the past where
>>>>>>>> it was recommended moving the dynamic logic to a remote function
>>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html> so
>>>>>>>> that one could achieve that by deploying a new container. I think that's
>>>>>>>> not very realistic as updates might happen with a frequency that's not
>>>>>>>> compatible with that approach (e.g., sticking to the fraud detection
>>>>>>>> example, updating fraud detection rules every hour is not unusual), nor
>>>>>>>> should one be deploying a new container when data (not code) changes.
>>>>>>>>
>>>>>>>> Is there a way of, for example, modifying FunctionProviders
>>>>>>>> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
>>>>>>>> on the fly?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Miguel
>>>>>>>>
>>>>>>>

Re: [Statefun] Dynamic behavior

Posted by Seth Wiesman <sj...@gmail.com>.
I don't think there is anything statefun specific here and I would follow
Igals advice.

Let's say you have a state value called `Behavior` that describes the
behavior of an instance. There is a default behavior but any given instance
may have a customized behavior. What I would do is the following.

Create a state in the TransactionManager called `behavior` that stores the
instance's customized behavior if it exists. When a transaction comes in,
read the behavior state. If it exists (is not None in the case of Python)
then use that. If not, then fall back to the default instance.

The default instance can be provided one of several ways depending on the
specifics of your use case:

1) hard-coded in the function.
2) dynamically loaded via a background thread as a global. so long as that
default is immutable this is safe
3) dynamically loaded via the function instance on first use. stateful
functions have strong support for making async requests so you could simply
query the behavior for that instance on first use from a 3rd party service.

Seth


On Tue, Feb 23, 2021 at 10:55 AM Miguel Araújo <up...@gmail.com> wrote:

> Hi Seth,
>
> Thanks for your comment. I've seen that repository in the past and it was
> really helpful to "validate" that this was the way to go.
> I think my question is not being addressed there though: how could one add
> dynamic behavior to your TransactionManager? In this case, state that is
> available to all TransactionManager instances when they receive a message
> of type Transaction for the first time.
>
> Seth Wiesman <sj...@gmail.com> escreveu no dia terça, 23/02/2021 à(s)
> 16:02:
>
>> Hey Miguel,
>>
>> What you are describing is exactly what is implemented in this repo. The
>> TransactionManager function acts as an orchestrator to work with the other
>> functions. The repo is structured as an exercise but the full solution
>> exists on the branch `advanced-solution`.
>>
>> https://github.com/ververica/flink-statefun-workshop
>>
>> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo <up...@gmail.com>
>> wrote:
>>
>>> Another possibility I am considering is handling this in Flink using a
>>> broadcast and adding all the information needed to the event itself. I'm a
>>> little concerned about the amount of data that will be serialized and sent
>>> on every request though, as I'll need to include information about all
>>> available remote functions, for instance.
>>>
>>> Miguel Araújo <up...@gmail.com> escreveu no dia terça, 23/02/2021
>>> à(s) 09:14:
>>>
>>>> Hi Gordon, Igal,
>>>>
>>>> Thanks for your replies.
>>>> PubSub would be a good addition, I have a few scenarios where that
>>>> would be useful.
>>>>
>>>> However, after reading your answers I realized that your proposed
>>>> solutions (which address the most obvious interpretation of my question) do
>>>> not necessarily solve my problem. I should have just stated what it was,
>>>> instead of trying to propose a solution by discussing broadcast...
>>>>
>>>> I'm trying to implement an "orchestrator" function which, given an
>>>> event, will trigger multiple remote function calls, aggregate their results
>>>> and eventually call yet more functions (based on a provided dependency
>>>> graph). Hence, this orchestrator function has state per event_id and each
>>>> function instance is short-lived (a couple seconds at most, ideally
>>>> sub-second). The question then is not about how to modify a long-running
>>>> function instance (which PubSub would enable), but rather how to have the
>>>> dependency graph available to new functions.
>>>>
>>>> Given this, Igal's answer seems promising because we have the
>>>> FunctionProvider instantiating a local variable and passing it down on
>>>> every instantiation. I'm assuming there is one FunctionProvider per
>>>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>>>> data coming from a Flink DataStream, or receiving StateFun messages?
>>>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>>>
>>>> I really appreciate your help.
>>>>
>>>> Miguel
>>>>
>>>> Igal Shilman <ig...@ververica.com> escreveu no dia segunda, 22/02/2021
>>>> à(s) 12:09:
>>>>
>>>>> Hi Miguel,
>>>>>
>>>>> I think that there are a couple of ways to achieve this, and it really
>>>>> depends on your specific use case, and the trade-offs
>>>>> that you are willing to accept.
>>>>>
>>>>> For example, one way to approach this:
>>>>> - Suppose you have an external service somewhere that returns a
>>>>> representation of the logic to be interpreted by
>>>>> your function at runtime (I think that is the scenario you are
>>>>> describing)
>>>>> - Then, you can write a background task (a thread) that periodically
>>>>> queries that service, and keeps in memory the latest version.
>>>>> - You can initialize this background task in your FunctionProvider
>>>>> implementation, or even in your StatefulModule if you wish.
>>>>> - Then, make sure that your dynamic stateful function has an access to
>>>>> the latest value fetched by your client (for example via a shared reference
>>>>> like a j.u.c.AtomicReference)
>>>>> - Then on receive, you can simply get that reference and re-apply your
>>>>> rules.
>>>>>
>>>>> Take a look at [1] for example (it is not exactly the same, but I
>>>>> believe that it is close enough)
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>>>>
>>>>> Good luck,
>>>>> Igal.
>>>>>
>>>>>
>>>>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <
>>>>> tzulitai@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> FWIW, there is this JIRA that is tracking a pubsub / broadcast
>>>>>> messaging primitive in StateFun:
>>>>>> https://issues.apache.org/jira/browse/FLINK-16319
>>>>>>
>>>>>> This is probably what you are looking for. And I do agree, in the
>>>>>> case that the control stream (which updates the application logic) is high
>>>>>> volume, redeploying functions may not work well.
>>>>>>
>>>>>> I don't think there really is a "recommended" way of doing the
>>>>>> "broadcast control stream, join with main stream" pattern with StateFun at
>>>>>> the moment, at least without FLINK-16319.
>>>>>> On the other hand, it could be possible to use stateful functions to
>>>>>> implement a pub-sub model in user space for the time being. I've actually
>>>>>> left some ideas for implementing that in the comments of FLINK-16319.
>>>>>>
>>>>>> Cheers,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <up...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> What is the recommended way of achieving the equivalent of a
>>>>>>> broadcast in Flink when using Stateful Functions?
>>>>>>>
>>>>>>> For instance, assume we are implementing something similar to Flink's
>>>>>>> demo fraud detection
>>>>>>> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
>>>>>>> in Stateful Functions - how can one dynamically update the application's
>>>>>>> logic then?
>>>>>>> There was a similar question in this mailing list in the past where
>>>>>>> it was recommended moving the dynamic logic to a remote function
>>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html> so
>>>>>>> that one could achieve that by deploying a new container. I think that's
>>>>>>> not very realistic as updates might happen with a frequency that's not
>>>>>>> compatible with that approach (e.g., sticking to the fraud detection
>>>>>>> example, updating fraud detection rules every hour is not unusual), nor
>>>>>>> should one be deploying a new container when data (not code) changes.
>>>>>>>
>>>>>>> Is there a way of, for example, modifying FunctionProviders
>>>>>>> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
>>>>>>> on the fly?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Miguel
>>>>>>>
>>>>>>

Re: [Statefun] Dynamic behavior

Posted by Miguel Araújo <up...@gmail.com>.
Hi Seth,

Thanks for your comment. I've seen that repository in the past and it was
really helpful to "validate" that this was the way to go.
I think my question is not being addressed there though: how could one add
dynamic behavior to your TransactionManager? In this case, state that is
available to all TransactionManager instances when they receive a message
of type Transaction for the first time.

Seth Wiesman <sj...@gmail.com> escreveu no dia terça, 23/02/2021 à(s)
16:02:

> Hey Miguel,
>
> What you are describing is exactly what is implemented in this repo. The
> TransactionManager function acts as an orchestrator to work with the other
> functions. The repo is structured as an exercise but the full solution
> exists on the branch `advanced-solution`.
>
> https://github.com/ververica/flink-statefun-workshop
>
> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo <up...@gmail.com> wrote:
>
>> Another possibility I am considering is handling this in Flink using a
>> broadcast and adding all the information needed to the event itself. I'm a
>> little concerned about the amount of data that will be serialized and sent
>> on every request though, as I'll need to include information about all
>> available remote functions, for instance.
>>
>> Miguel Araújo <up...@gmail.com> escreveu no dia terça, 23/02/2021
>> à(s) 09:14:
>>
>>> Hi Gordon, Igal,
>>>
>>> Thanks for your replies.
>>> PubSub would be a good addition, I have a few scenarios where that would
>>> be useful.
>>>
>>> However, after reading your answers I realized that your proposed
>>> solutions (which address the most obvious interpretation of my question) do
>>> not necessarily solve my problem. I should have just stated what it was,
>>> instead of trying to propose a solution by discussing broadcast...
>>>
>>> I'm trying to implement an "orchestrator" function which, given an
>>> event, will trigger multiple remote function calls, aggregate their results
>>> and eventually call yet more functions (based on a provided dependency
>>> graph). Hence, this orchestrator function has state per event_id and each
>>> function instance is short-lived (a couple seconds at most, ideally
>>> sub-second). The question then is not about how to modify a long-running
>>> function instance (which PubSub would enable), but rather how to have the
>>> dependency graph available to new functions.
>>>
>>> Given this, Igal's answer seems promising because we have the
>>> FunctionProvider instantiating a local variable and passing it down on
>>> every instantiation. I'm assuming there is one FunctionProvider per
>>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>>> data coming from a Flink DataStream, or receiving StateFun messages?
>>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>>
>>> I really appreciate your help.
>>>
>>> Miguel
>>>
>>> Igal Shilman <ig...@ververica.com> escreveu no dia segunda, 22/02/2021
>>> à(s) 12:09:
>>>
>>>> Hi Miguel,
>>>>
>>>> I think that there are a couple of ways to achieve this, and it really
>>>> depends on your specific use case, and the trade-offs
>>>> that you are willing to accept.
>>>>
>>>> For example, one way to approach this:
>>>> - Suppose you have an external service somewhere that returns a
>>>> representation of the logic to be interpreted by
>>>> your function at runtime (I think that is the scenario you are
>>>> describing)
>>>> - Then, you can write a background task (a thread) that periodically
>>>> queries that service, and keeps in memory the latest version.
>>>> - You can initialize this background task in your FunctionProvider
>>>> implementation, or even in your StatefulModule if you wish.
>>>> - Then, make sure that your dynamic stateful function has an access to
>>>> the latest value fetched by your client (for example via a shared reference
>>>> like a j.u.c.AtomicReference)
>>>> - Then on receive, you can simply get that reference and re-apply your
>>>> rules.
>>>>
>>>> Take a look at [1] for example (it is not exactly the same, but I
>>>> believe that it is close enough)
>>>>
>>>> [1]
>>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>>>
>>>> Good luck,
>>>> Igal.
>>>>
>>>>
>>>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> FWIW, there is this JIRA that is tracking a pubsub / broadcast
>>>>> messaging primitive in StateFun:
>>>>> https://issues.apache.org/jira/browse/FLINK-16319
>>>>>
>>>>> This is probably what you are looking for. And I do agree, in the case
>>>>> that the control stream (which updates the application logic) is high
>>>>> volume, redeploying functions may not work well.
>>>>>
>>>>> I don't think there really is a "recommended" way of doing the
>>>>> "broadcast control stream, join with main stream" pattern with StateFun at
>>>>> the moment, at least without FLINK-16319.
>>>>> On the other hand, it could be possible to use stateful functions to
>>>>> implement a pub-sub model in user space for the time being. I've actually
>>>>> left some ideas for implementing that in the comments of FLINK-16319.
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <up...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> What is the recommended way of achieving the equivalent of a
>>>>>> broadcast in Flink when using Stateful Functions?
>>>>>>
>>>>>> For instance, assume we are implementing something similar to Flink's
>>>>>> demo fraud detection
>>>>>> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
>>>>>> in Stateful Functions - how can one dynamically update the application's
>>>>>> logic then?
>>>>>> There was a similar question in this mailing list in the past where
>>>>>> it was recommended moving the dynamic logic to a remote function
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html> so
>>>>>> that one could achieve that by deploying a new container. I think that's
>>>>>> not very realistic as updates might happen with a frequency that's not
>>>>>> compatible with that approach (e.g., sticking to the fraud detection
>>>>>> example, updating fraud detection rules every hour is not unusual), nor
>>>>>> should one be deploying a new container when data (not code) changes.
>>>>>>
>>>>>> Is there a way of, for example, modifying FunctionProviders
>>>>>> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
>>>>>> on the fly?
>>>>>>
>>>>>> Thanks,
>>>>>> Miguel
>>>>>>
>>>>>

Re: [Statefun] Dynamic behavior

Posted by Seth Wiesman <sj...@gmail.com>.
Hey Miguel,

What you are describing is exactly what is implemented in this repo. The
TransactionManager function acts as an orchestrator to work with the other
functions. The repo is structured as an exercise but the full solution
exists on the branch `advanced-solution`.

https://github.com/ververica/flink-statefun-workshop

On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo <up...@gmail.com> wrote:

> Another possibility I am considering is handling this in Flink using a
> broadcast and adding all the information needed to the event itself. I'm a
> little concerned about the amount of data that will be serialized and sent
> on every request though, as I'll need to include information about all
> available remote functions, for instance.
>
> Miguel Araújo <up...@gmail.com> escreveu no dia terça, 23/02/2021
> à(s) 09:14:
>
>> Hi Gordon, Igal,
>>
>> Thanks for your replies.
>> PubSub would be a good addition, I have a few scenarios where that would
>> be useful.
>>
>> However, after reading your answers I realized that your proposed
>> solutions (which address the most obvious interpretation of my question) do
>> not necessarily solve my problem. I should have just stated what it was,
>> instead of trying to propose a solution by discussing broadcast...
>>
>> I'm trying to implement an "orchestrator" function which, given an event,
>> will trigger multiple remote function calls, aggregate their results and
>> eventually call yet more functions (based on a provided dependency graph).
>> Hence, this orchestrator function has state per event_id and each function
>> instance is short-lived (a couple seconds at most, ideally sub-second). The
>> question then is not about how to modify a long-running function instance
>> (which PubSub would enable), but rather how to have the dependency graph
>> available to new functions.
>>
>> Given this, Igal's answer seems promising because we have the
>> FunctionProvider instantiating a local variable and passing it down on
>> every instantiation. I'm assuming there is one FunctionProvider per
>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>> data coming from a Flink DataStream, or receiving StateFun messages?
>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>
>> I really appreciate your help.
>>
>> Miguel
>>
>> Igal Shilman <ig...@ververica.com> escreveu no dia segunda, 22/02/2021
>> à(s) 12:09:
>>
>>> Hi Miguel,
>>>
>>> I think that there are a couple of ways to achieve this, and it really
>>> depends on your specific use case, and the trade-offs
>>> that you are willing to accept.
>>>
>>> For example, one way to approach this:
>>> - Suppose you have an external service somewhere that returns a
>>> representation of the logic to be interpreted by
>>> your function at runtime (I think that is the scenario you are
>>> describing)
>>> - Then, you can write a background task (a thread) that periodically
>>> queries that service, and keeps in memory the latest version.
>>> - You can initialize this background task in your FunctionProvider
>>> implementation, or even in your StatefulModule if you wish.
>>> - Then, make sure that your dynamic stateful function has an access to
>>> the latest value fetched by your client (for example via a shared reference
>>> like a j.u.c.AtomicReference)
>>> - Then on receive, you can simply get that reference and re-apply your
>>> rules.
>>>
>>> Take a look at [1] for example (it is not exactly the same, but I
>>> believe that it is close enough)
>>>
>>> [1]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>>
>>> Good luck,
>>> Igal.
>>>
>>>
>>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> FWIW, there is this JIRA that is tracking a pubsub / broadcast
>>>> messaging primitive in StateFun:
>>>> https://issues.apache.org/jira/browse/FLINK-16319
>>>>
>>>> This is probably what you are looking for. And I do agree, in the case
>>>> that the control stream (which updates the application logic) is high
>>>> volume, redeploying functions may not work well.
>>>>
>>>> I don't think there really is a "recommended" way of doing the
>>>> "broadcast control stream, join with main stream" pattern with StateFun at
>>>> the moment, at least without FLINK-16319.
>>>> On the other hand, it could be possible to use stateful functions to
>>>> implement a pub-sub model in user space for the time being. I've actually
>>>> left some ideas for implementing that in the comments of FLINK-16319.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <up...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> What is the recommended way of achieving the equivalent of a broadcast
>>>>> in Flink when using Stateful Functions?
>>>>>
>>>>> For instance, assume we are implementing something similar to Flink's
>>>>> demo fraud detection
>>>>> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
>>>>> in Stateful Functions - how can one dynamically update the application's
>>>>> logic then?
>>>>> There was a similar question in this mailing list in the past where it
>>>>> was recommended moving the dynamic logic to a remote function
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html> so
>>>>> that one could achieve that by deploying a new container. I think that's
>>>>> not very realistic as updates might happen with a frequency that's not
>>>>> compatible with that approach (e.g., sticking to the fraud detection
>>>>> example, updating fraud detection rules every hour is not unusual), nor
>>>>> should one be deploying a new container when data (not code) changes.
>>>>>
>>>>> Is there a way of, for example, modifying FunctionProviders
>>>>> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
>>>>> on the fly?
>>>>>
>>>>> Thanks,
>>>>> Miguel
>>>>>
>>>>

Re: [Statefun] Dynamic behavior

Posted by Miguel Araújo <up...@gmail.com>.
Another possibility I am considering is handling this in Flink using a
broadcast and adding all the information needed to the event itself. I'm a
little concerned about the amount of data that will be serialized and sent
on every request though, as I'll need to include information about all
available remote functions, for instance.

Miguel Araújo <up...@gmail.com> escreveu no dia terça, 23/02/2021 à(s)
09:14:

> Hi Gordon, Igal,
>
> Thanks for your replies.
> PubSub would be a good addition, I have a few scenarios where that would
> be useful.
>
> However, after reading your answers I realized that your proposed
> solutions (which address the most obvious interpretation of my question) do
> not necessarily solve my problem. I should have just stated what it was,
> instead of trying to propose a solution by discussing broadcast...
>
> I'm trying to implement an "orchestrator" function which, given an event,
> will trigger multiple remote function calls, aggregate their results and
> eventually call yet more functions (based on a provided dependency graph).
> Hence, this orchestrator function has state per event_id and each function
> instance is short-lived (a couple seconds at most, ideally sub-second). The
> question then is not about how to modify a long-running function instance
> (which PubSub would enable), but rather how to have the dependency graph
> available to new functions.
>
> Given this, Igal's answer seems promising because we have the
> FunctionProvider instantiating a local variable and passing it down on
> every instantiation. I'm assuming there is one FunctionProvider per
> TaskManager. Is there an easy way to have the FunctionProvider receiving
> data coming from a Flink DataStream, or receiving StateFun messages?
> Otherwise, I could have it subscribe to a Kafka topic directly.
>
> I really appreciate your help.
>
> Miguel
>
> Igal Shilman <ig...@ververica.com> escreveu no dia segunda, 22/02/2021
> à(s) 12:09:
>
>> Hi Miguel,
>>
>> I think that there are a couple of ways to achieve this, and it really
>> depends on your specific use case, and the trade-offs
>> that you are willing to accept.
>>
>> For example, one way to approach this:
>> - Suppose you have an external service somewhere that returns a
>> representation of the logic to be interpreted by
>> your function at runtime (I think that is the scenario you are describing)
>> - Then, you can write a background task (a thread) that periodically
>> queries that service, and keeps in memory the latest version.
>> - You can initialize this background task in your FunctionProvider
>> implementation, or even in your StatefulModule if you wish.
>> - Then, make sure that your dynamic stateful function has an access to
>> the latest value fetched by your client (for example via a shared reference
>> like a j.u.c.AtomicReference)
>> - Then on receive, you can simply get that reference and re-apply your
>> rules.
>>
>> Take a look at [1] for example (it is not exactly the same, but I believe
>> that it is close enough)
>>
>> [1]
>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>
>> Good luck,
>> Igal.
>>
>>
>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
>>> primitive in StateFun:
>>> https://issues.apache.org/jira/browse/FLINK-16319
>>>
>>> This is probably what you are looking for. And I do agree, in the case
>>> that the control stream (which updates the application logic) is high
>>> volume, redeploying functions may not work well.
>>>
>>> I don't think there really is a "recommended" way of doing the
>>> "broadcast control stream, join with main stream" pattern with StateFun at
>>> the moment, at least without FLINK-16319.
>>> On the other hand, it could be possible to use stateful functions to
>>> implement a pub-sub model in user space for the time being. I've actually
>>> left some ideas for implementing that in the comments of FLINK-16319.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <up...@gmail.com>
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> What is the recommended way of achieving the equivalent of a broadcast
>>>> in Flink when using Stateful Functions?
>>>>
>>>> For instance, assume we are implementing something similar to Flink's
>>>> demo fraud detection
>>>> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
>>>> in Stateful Functions - how can one dynamically update the application's
>>>> logic then?
>>>> There was a similar question in this mailing list in the past where it
>>>> was recommended moving the dynamic logic to a remote function
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html> so
>>>> that one could achieve that by deploying a new container. I think that's
>>>> not very realistic as updates might happen with a frequency that's not
>>>> compatible with that approach (e.g., sticking to the fraud detection
>>>> example, updating fraud detection rules every hour is not unusual), nor
>>>> should one be deploying a new container when data (not code) changes.
>>>>
>>>> Is there a way of, for example, modifying FunctionProviders
>>>> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
>>>> on the fly?
>>>>
>>>> Thanks,
>>>> Miguel
>>>>
>>>

Re: [Statefun] Dynamic behavior

Posted by Miguel Araújo <up...@gmail.com>.
Hi Gordon, Igal,

Thanks for your replies.
PubSub would be a good addition, I have a few scenarios where that would be
useful.

However, after reading your answers I realized that your proposed solutions
(which address the most obvious interpretation of my question) do not
necessarily solve my problem. I should have just stated what it was,
instead of trying to propose a solution by discussing broadcast...

I'm trying to implement an "orchestrator" function which, given an event,
will trigger multiple remote function calls, aggregate their results and
eventually call yet more functions (based on a provided dependency graph).
Hence, this orchestrator function has state per event_id and each function
instance is short-lived (a couple seconds at most, ideally sub-second). The
question then is not about how to modify a long-running function instance
(which PubSub would enable), but rather how to have the dependency graph
available to new functions.

Given this, Igal's answer seems promising because we have the
FunctionProvider instantiating a local variable and passing it down on
every instantiation. I'm assuming there is one FunctionProvider per
TaskManager. Is there an easy way to have the FunctionProvider receiving
data coming from a Flink DataStream, or receiving StateFun messages?
Otherwise, I could have it subscribe to a Kafka topic directly.

I really appreciate your help.

Miguel

Igal Shilman <ig...@ververica.com> escreveu no dia segunda, 22/02/2021 à(s)
12:09:

> Hi Miguel,
>
> I think that there are a couple of ways to achieve this, and it really
> depends on your specific use case, and the trade-offs
> that you are willing to accept.
>
> For example, one way to approach this:
> - Suppose you have an external service somewhere that returns a
> representation of the logic to be interpreted by
> your function at runtime (I think that is the scenario you are describing)
> - Then, you can write a background task (a thread) that periodically
> queries that service, and keeps in memory the latest version.
> - You can initialize this background task in your FunctionProvider
> implementation, or even in your StatefulModule if you wish.
> - Then, make sure that your dynamic stateful function has an access to the
> latest value fetched by your client (for example via a shared reference
> like a j.u.c.AtomicReference)
> - Then on receive, you can simply get that reference and re-apply your
> rules.
>
> Take a look at [1] for example (it is not exactly the same, but I believe
> that it is close enough)
>
> [1]
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>
> Good luck,
> Igal.
>
>
> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi,
>>
>> FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
>> primitive in StateFun:
>> https://issues.apache.org/jira/browse/FLINK-16319
>>
>> This is probably what you are looking for. And I do agree, in the case
>> that the control stream (which updates the application logic) is high
>> volume, redeploying functions may not work well.
>>
>> I don't think there really is a "recommended" way of doing the "broadcast
>> control stream, join with main stream" pattern with StateFun at the moment,
>> at least without FLINK-16319.
>> On the other hand, it could be possible to use stateful functions to
>> implement a pub-sub model in user space for the time being. I've actually
>> left some ideas for implementing that in the comments of FLINK-16319.
>>
>> Cheers,
>> Gordon
>>
>>
>> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <up...@gmail.com>
>> wrote:
>>
>>> Hi everyone,
>>>
>>> What is the recommended way of achieving the equivalent of a broadcast
>>> in Flink when using Stateful Functions?
>>>
>>> For instance, assume we are implementing something similar to Flink's
>>> demo fraud detection
>>> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
>>> in Stateful Functions - how can one dynamically update the application's
>>> logic then?
>>> There was a similar question in this mailing list in the past where it
>>> was recommended moving the dynamic logic to a remote function
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html> so
>>> that one could achieve that by deploying a new container. I think that's
>>> not very realistic as updates might happen with a frequency that's not
>>> compatible with that approach (e.g., sticking to the fraud detection
>>> example, updating fraud detection rules every hour is not unusual), nor
>>> should one be deploying a new container when data (not code) changes.
>>>
>>> Is there a way of, for example, modifying FunctionProviders
>>> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
>>> on the fly?
>>>
>>> Thanks,
>>> Miguel
>>>
>>

Re: [Statefun] Dynamic behavior

Posted by Igal Shilman <ig...@ververica.com>.
Hi Miguel,

I think that there are a couple of ways to achieve this, and it really
depends on your specific use case, and the trade-offs
that you are willing to accept.

For example, one way to approach this:
- Suppose you have an external service somewhere that returns a
representation of the logic to be interpreted by
your function at runtime (I think that is the scenario you are describing)
- Then, you can write a background task (a thread) that periodically
queries that service, and keeps in memory the latest version.
- You can initialize this background task in your FunctionProvider
implementation, or even in your StatefulModule if you wish.
- Then, make sure that your dynamic stateful function has an access to the
latest value fetched by your client (for example via a shared reference
like a j.u.c.AtomicReference)
- Then on receive, you can simply get that reference and re-apply your
rules.

Take a look at [1] for example (it is not exactly the same, but I believe
that it is close enough)

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java

Good luck,
Igal.


On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
> primitive in StateFun:
> https://issues.apache.org/jira/browse/FLINK-16319
>
> This is probably what you are looking for. And I do agree, in the case
> that the control stream (which updates the application logic) is high
> volume, redeploying functions may not work well.
>
> I don't think there really is a "recommended" way of doing the "broadcast
> control stream, join with main stream" pattern with StateFun at the moment,
> at least without FLINK-16319.
> On the other hand, it could be possible to use stateful functions to
> implement a pub-sub model in user space for the time being. I've actually
> left some ideas for implementing that in the comments of FLINK-16319.
>
> Cheers,
> Gordon
>
>
> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <up...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> What is the recommended way of achieving the equivalent of a broadcast in
>> Flink when using Stateful Functions?
>>
>> For instance, assume we are implementing something similar to Flink's
>> demo fraud detection
>> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
>> in Stateful Functions - how can one dynamically update the application's
>> logic then?
>> There was a similar question in this mailing list in the past where it
>> was recommended moving the dynamic logic to a remote function
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html> so
>> that one could achieve that by deploying a new container. I think that's
>> not very realistic as updates might happen with a frequency that's not
>> compatible with that approach (e.g., sticking to the fraud detection
>> example, updating fraud detection rules every hour is not unusual), nor
>> should one be deploying a new container when data (not code) changes.
>>
>> Is there a way of, for example, modifying FunctionProviders
>> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
>> on the fly?
>>
>> Thanks,
>> Miguel
>>
>

Re: [Statefun] Dynamic behavior

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
primitive in StateFun:
https://issues.apache.org/jira/browse/FLINK-16319

This is probably what you are looking for. And I do agree, in the case that
the control stream (which updates the application logic) is high volume,
redeploying functions may not work well.

I don't think there really is a "recommended" way of doing the "broadcast
control stream, join with main stream" pattern with StateFun at the moment,
at least without FLINK-16319.
On the other hand, it could be possible to use stateful functions to
implement a pub-sub model in user space for the time being. I've actually
left some ideas for implementing that in the comments of FLINK-16319.

Cheers,
Gordon


On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo <up...@gmail.com> wrote:

> Hi everyone,
>
> What is the recommended way of achieving the equivalent of a broadcast in
> Flink when using Stateful Functions?
>
> For instance, assume we are implementing something similar to Flink's
> demo fraud detection
> <https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html> but
> in Stateful Functions - how can one dynamically update the application's
> logic then?
> There was a similar question in this mailing list in the past where it was recommended
> moving the dynamic logic to a remote function
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-Functions-ML-model-prediction-tp38286p38302.html> so
> that one could achieve that by deploying a new container. I think that's
> not very realistic as updates might happen with a frequency that's not
> compatible with that approach (e.g., sticking to the fraud detection
> example, updating fraud detection rules every hour is not unusual), nor
> should one be deploying a new container when data (not code) changes.
>
> Is there a way of, for example, modifying FunctionProviders
> <https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/java.html#function-providers-and-dependency-injection>
> on the fly?
>
> Thanks,
> Miguel
>