You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Pettersson <da...@gmail.com> on 2020/01/01 14:40:48 UTC

Re: Stateful function metrics

Hi again,

My current implementation of "pubsub" brings quite a lot of boilerplate I
must say, so the "Broadcast by pattern" idea would be very useful for this
specific requirement!
I'll look in the Stateful functions code and see if I can come up with a
suggestion.

I'm new to distributed computing and "Key salting" but it makes sense to
partition out the subscribers to increase the parallelism.
I can't fully understand when you wrote this though: "You would also have
to have a logic of assigning a subscriber to a token, *and on publish you’d
have to supply the message for each token*."

Wouldn't it be as simple to just add the below in my example from
FnLargeTradeByParticipant:

context.send(FnInstrumentIsClosingNotifier.TYPE, trade.getInstrumentId
*+ (int)(Math.random()*x*),  ......... ) ?

as the publisher is storing the subscribers address ?

Thanks,

Regards
Dan

Den tis 31 dec. 2019 kl 20:11 skrev Igal Shilman <ig...@ververica.com>:

> Hi Dan,
>
> I think that your approach makes sense, and maybe there is a case for
> either a built in pubsub mechanism (I’ve been asked something similar at
> Flink Forward this year) or broadcasting a message to a pattern (amazon:*)
>
> For the moment, assuming the way your key is designed is not going to be
> changed, I can imagine an efficient pubsub mechanism implemented in “user
> land” as a stateful function. Having a single instance per topic would
> limit scalability severely, therefore I’d suggest mixing into the topic
> name a finite number of tokens (also called key salting). You would also
> have to have a logic of assigning a subscriber to a token, and on publish
> you’d have to supply the message for each token.
> The number of tokens is up to you to decide, but it is a function of your
> parallelism, the number of subscribers per topic (the number of
> participantId per instrumentId).
> (In your attached solution the number of tokens is 1)
>
> Happy new year,
> Igal
>
> On Monday, December 30, 2019, Dan Pettersson <da...@gmail.com>
> wrote:
>
>>
>> Hi Igal and Thanks for your quick response and yes, you got my second
>> question right.
>>
>> I'm a building a small PoC around fraudulent trades and in short, I've
>> fine-grained the
>> functions to the level participantId + "::" + instrumentId (ie
>> "BankA::AMAZON")
>>
>> In this flow of stock exchange messages, there are messages that tells
>> the market
>> if the instrument has opened, is being halted or being closed for the day.
>>
>> These messages come on instrumentId level so I have to route these
>> message to all functions with the key participantId + "::" +
>> (actual)instrumentId.
>> I had hoped to be able to get a copy of all functions from the repository
>> to loop thru them and dispatch but I can't find a way to get hold of them.
>> Is there any way I can get them?
>>
>> I haven't studied the core functionality enough but could it be an option
>> to open up the repository and return a copy of the
>> ObjectOpenHashMap that holds all the functions? I guess it's not a common
>> requirement so to keep them hidden is probably the best option.
>>
>> As a workaround, I've created "Function listeners" where functions can
>> subscribe to a certain type of message.
>>
>> For example, FnIsClosingNotifier (key is instrumentId) is holding a
>> PersistenceValue with all the function addresses
>> that subscribe to an instrument closing message. The subscription is done
>> from other functions in the configuration by just sending
>> a "Protobuf empty message" and when the closing message arrives the
>> dispatch to the listeners is done in FnIsClosingNotifier.
>>
>> Is there a better way that you can think of to implement this kind of
>> requirement, where one message should be sent to (on beforehand not known)
>> several subscribing functions.
>>
>> Below is some code that hopefully describes my current implementation to
>> subscribe to a certain type of message.
>>
>> The function that wants to be notified when the closing message arrives.
>> This function has the id participantId::InstrumentId
>>
>> [image: image.png]
>>
>> And the notifier that holds all subscribers Addresses in the
>> persistenceValue "listeners"
>> [image: image.png]
>> Regards
>> Dan
>>
>> Den mån 30 dec. 2019 kl 00:50 skrev Igal Shilman <ig...@ververica.com>:
>>
>>> Hi Dan,
>>>
>>> You can learn more about Flink’s metrics system at [1]
>>> You would be able to either setup a reporter that would export the
>>> metrics to an external system, or query the metrics via the REST API, or
>>> simply use Flink’s web ui to obtain them.
>>>
>>> If I understand the second part of your question correctly - you have a
>>> persisted value in a base class, but few different function types that
>>> derive from that base class, and you are wondering what is the scope of
>>> that persisted value?
>>> If that is the question, then the scope is bound to the function
>>> address(type+id) and not to the Java instance.
>>> So it is safe.
>>>
>>> [1] -
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html
>>>
>>> Happy hacking,
>>> Igal
>>>
>>>
>>> On Sunday, December 29, 2019, Dan Pettersson <da...@gmail.com>
>>> wrote:
>>>
>>>> Hi all
>>>>
>>>> I'm trying to get hold of some metrics from the functions that I have
>>>> created but can't find a way to get them. It's the metrics mentioned here
>>>> I'm interested about:
>>>> https://statefun.io/deployment_operations/metrics.html
>>>> Any suggestions are appreciated.
>>>>
>>>> I also have a question regarding "best practice" when dealing with
>>>> function that share business logic. Is it safe storage wise to extends an
>>>> Abstract class that holds the persistent values?
>>>>
>>>> Thanks in advance and Happy coding during the holidays :-)
>>>>
>>>> Regards
>>>> Dan
>>>>
>>>

Stateful function metrics

Posted by Igal Shilman <ig...@ververica.com>.
(re-adding the user-mailing list)

I think that 100 subscribers is very manageable and doesn’t require any
further partitioning.
I think that the general rule of thumb should be having “small” state size
per function instance, but this is a case by case thing.

Thanks,
Igal.

On Thursday, January 2, 2020, Dan Pettersson <da...@gmail.com>
wrote:

>  Aah yes, now I get it :-)
>
> I mistakenly thought that the partitions I created
> of FnInstrumentIsClosingNotifier automatically would recieve the Closing
> message.
> Now I realise that they need to be notified aswell.
>
> Is there any guidelines/recommendations on how to find a suitable
> partionsize taking for example number of cores into consideration?
> Or is it just good old experiment and measure performance that will show
> the ideal size?
>
> For this case it would be around 5000 instrument that has lets say 100
> subscribers each.
>
> Thanks for your help!
>
> /Dan
>
> Den tors 2 jan. 2020 kl 11:39 skrev Igal Shilman <ig...@ververica.com>:
>
>> Hi Dan,
>>
>> Let me clarify what I mean:
>> 1. Registering a subscriber, means sending a message to a partition of a
>> FnInstrumentIsClosingNotifier, so you need a
>> mapping from a subscriberId to a partition (this should be deterministic
>> so that you can unregister the subscriber)
>> It can be as simple as subscriberId.hashCode() % numPartitions.
>>
>> 2. When you want to notify all the subscribers across all the partitions,
>> you would need to send the message to all the partitions, so that they in
>> turn would notify their subscribers.
>>
>> I hope that helps,
>> Igal.
>>
>> On Wednesday, January 1, 2020, Dan Pettersson <da...@gmail.com>
>> wrote:
>>
>>> Hi again,
>>>
>>> My current implementation of "pubsub" brings quite a lot of boilerplate
>>> I must say, so the "Broadcast by pattern" idea would be very useful for
>>> this specific requirement!
>>> I'll look in the Stateful functions code and see if I can come up with a
>>> suggestion.
>>>
>>> I'm new to distributed computing and "Key salting" but it makes sense to
>>> partition out the subscribers to increase the parallelism.
>>> I can't fully understand when you wrote this though: "You would also
>>> have to have a logic of assigning a subscriber to a token, *and on
>>> publish you’d have to supply the message for each token*."
>>>
>>> Wouldn't it be as simple to just add the below in my example from
>>> FnLargeTradeByParticipant:
>>>
>>> context.send(FnInstrumentIsClosingNotifier.TYPE, trade.getInstrumentId
>>> *+ (int)(Math.random()*x*),  ......... ) ?
>>>
>>> as the publisher is storing the subscribers address ?
>>>
>>> Thanks,
>>>
>>> Regards
>>> Dan
>>>
>>> Den tis 31 dec. 2019 kl 20:11 skrev Igal Shilman <ig...@ververica.com>:
>>>
>>>> Hi Dan,
>>>>
>>>> I think that your approach makes sense, and maybe there is a case for
>>>> either a built in pubsub mechanism (I’ve been asked something similar at
>>>> Flink Forward this year) or broadcasting a message to a pattern (amazon:*)
>>>>
>>>> For the moment, assuming the way your key is designed is not going to
>>>> be changed, I can imagine an efficient pubsub mechanism implemented in
>>>> “user land” as a stateful function. Having a single instance per topic
>>>> would limit scalability severely, therefore I’d suggest mixing into the
>>>> topic name a finite number of tokens (also called key salting). You would
>>>> also have to have a logic of assigning a subscriber to a token, and on
>>>> publish you’d have to supply the message for each token.
>>>> The number of tokens is up to you to decide, but it is a function of
>>>> your parallelism, the number of subscribers per topic (the number of
>>>> participantId per instrumentId).
>>>> (In your attached solution the number of tokens is 1)
>>>>
>>>> Happy new year,
>>>> Igal
>>>>
>>>> On Monday, December 30, 2019, Dan Pettersson <
>>>> dan.pettersson77@gmail.com> wrote:
>>>>
>>>>>
>>>>> Hi Igal and Thanks for your quick response and yes, you got my second
>>>>> question right.
>>>>>
>>>>> I'm a building a small PoC around fraudulent trades and in short, I've
>>>>> fine-grained the
>>>>> functions to the level participantId + "::" + instrumentId (ie
>>>>> "BankA::AMAZON")
>>>>>
>>>>> In this flow of stock exchange messages, there are messages that tells
>>>>> the market
>>>>> if the instrument has opened, is being halted or being closed for the
>>>>> day.
>>>>>
>>>>> These messages come on instrumentId level so I have to route these
>>>>> message to all functions with the key participantId + "::" +
>>>>> (actual)instrumentId.
>>>>> I had hoped to be able to get a copy of all functions from the
>>>>> repository to loop thru them and dispatch but I can't find a way to get
>>>>> hold of them.
>>>>> Is there any way I can get them?
>>>>>
>>>>> I haven't studied the core functionality enough but could it be an
>>>>> option to open up the repository and return a copy of the
>>>>> ObjectOpenHashMap that holds all the functions? I guess it's not a
>>>>> common requirement so to keep them hidden is probably the best option.
>>>>>
>>>>> As a workaround, I've created "Function listeners" where functions can
>>>>> subscribe to a certain type of message.
>>>>>
>>>>> For example, FnIsClosingNotifier (key is instrumentId) is holding a
>>>>> PersistenceValue with all the function addresses
>>>>> that subscribe to an instrument closing message. The subscription is
>>>>> done from other functions in the configuration by just sending
>>>>> a "Protobuf empty message" and when the closing message arrives the
>>>>> dispatch to the listeners is done in FnIsClosingNotifier.
>>>>>
>>>>> Is there a better way that you can think of to implement this kind of
>>>>> requirement, where one message should be sent to (on beforehand not known)
>>>>> several subscribing functions.
>>>>>
>>>>> Below is some code that hopefully describes my current implementation
>>>>> to subscribe to a certain type of message.
>>>>>
>>>>> The function that wants to be notified when the closing message
>>>>> arrives. This function has the id participantId::InstrumentId
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> And the notifier that holds all subscribers Addresses in the
>>>>> persistenceValue "listeners"
>>>>> [image: image.png]
>>>>> Regards
>>>>> Dan
>>>>>
>>>>> Den mån 30 dec. 2019 kl 00:50 skrev Igal Shilman <ig...@ververica.com>:
>>>>>
>>>>>> Hi Dan,
>>>>>>
>>>>>> You can learn more about Flink’s metrics system at [1]
>>>>>> You would be able to either setup a reporter that would export the
>>>>>> metrics to an external system, or query the metrics via the REST API, or
>>>>>> simply use Flink’s web ui to obtain them.
>>>>>>
>>>>>> If I understand the second part of your question correctly - you have
>>>>>> a persisted value in a base class, but few different function types that
>>>>>> derive from that base class, and you are wondering what is the scope of
>>>>>> that persisted value?
>>>>>> If that is the question, then the scope is bound to the function
>>>>>> address(type+id) and not to the Java instance.
>>>>>> So it is safe.
>>>>>>
>>>>>> [1] - https://ci.apache.org/projects/flink/flink-docs-release-1.
>>>>>> 9/monitoring/metrics.html
>>>>>>
>>>>>> Happy hacking,
>>>>>> Igal
>>>>>>
>>>>>>
>>>>>> On Sunday, December 29, 2019, Dan Pettersson <
>>>>>> dan.pettersson77@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all
>>>>>>>
>>>>>>> I'm trying to get hold of some metrics from the functions that I
>>>>>>> have created but can't find a way to get them. It's the metrics
>>>>>>> mentioned here I'm interested about:
>>>>>>> https://statefun.io/deployment_operations/metrics.html
>>>>>>> Any suggestions are appreciated.
>>>>>>>
>>>>>>> I also have a question regarding "best practice" when dealing with
>>>>>>> function that share business logic. Is it safe storage wise to extends an
>>>>>>> Abstract class that holds the persistent values?
>>>>>>>
>>>>>>> Thanks in advance and Happy coding during the holidays :-)
>>>>>>>
>>>>>>> Regards
>>>>>>> Dan
>>>>>>>
>>>>>>