You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/07/03 03:50:31 UTC
How to dynamically initialize flink metrics in invoke method and then reuse it?
In one flink operator, i want to initialize multiple flink metrics according to message content.
As the code below.
public void invoke(ObjectNode node, Context context) throws Exception {
String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" + meter.getCount());
But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?
Thanks,
Lei
wanglei2@geekplus.com.cn
Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Posted by Xintong Song <to...@gmail.com>.
Ok, I see your problem. And yes, keeping a map of metrics should work.
Just for double checking, I assume there's an upper bound of your map keys
(table names)?
Because if not, an infinitely increasing in-memory map that is not managed
by Flink's state might become problematic.
Thank you~
Xintong Song
On Fri, Jul 3, 2020 at 2:39 PM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> Seems there's no direct solution.
> Perhaps i can implement this by initializing a HashMap<String, Meter> with
> all the possible value of tableName in `open` mehtod and get the
> corresponding Meter according to tableName in the `invoke` method.
>
>
> Thanks,
> Lei
> ------------------------------
> wanglei2@geekplus.com.cn
>
>
> *Sender:* wanglei2@geekplus.com.cn
> *Send Time:* 2020-07-03 14:27
> *Receiver:* Xintong Song <to...@gmail.com>
> *cc:* user <us...@flink.apache.org>
> *Subject:* Re: Re: How to dynamically initialize flink metrics in invoke
> method and then reuse it?
> Hi Xintong,
>
> Yes, initializing the metric in the `open` method works, but it doesn't
> solve my problem.
> I want to initialize the metric with a name that is extracted from the
> record content. Only in the `invoke` method i can do it.
>
> Actually my scenario is as follows.
> The record is MySQL binlog info. I want to monitor the qps by tableName.
> The tableName is different for every record.
>
> Thanks,
> Lei
>
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
> *Sender:* Xintong Song <to...@gmail.com>
> *Send Time:* 2020-07-03 13:14
> *Receiver:* wanglei2@geekplus.com.cn
> *cc:* user <us...@flink.apache.org>
> *Subject:* Re: How to dynamically initialize flink metrics in invoke
> method and then reuse it?
> Hi Lei,
>
> I think you should initialize the metric in the `open` method. Then you
> can save the initialized metric as a class field, and update it in the
> `invoke` method for each record.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jul 3, 2020 at 11:50 AM wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>>
>> In one flink operator, i want to initialize multiple flink metrics
>> according to message content.
>> As the code below.
>>
>> public void invoke(ObjectNode node, Context context) throws Exception {
>>
>> String tableName = node.get("metadata").get("topic").asText();
>> Meter meter = getRuntimeContext().getMetricGroup().meter(tableName,
>> new MeterView(10));
>> meter.markEvent();
>> log.info("### counter: " + meter.toString() + "\t" +
>> meter.getCount());
>>
>>
>> But in this way every invoke call will initialize a new metrics and the
>> count will be from zero again.
>> How can i reuse the metric initialized before?
>>
>> Thanks,
>> Lei
>> ------------------------------
>> wanglei2@geekplus.com.cn
>>
>>
Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Seems there's no direct solution.
Perhaps i can implement this by initializing a HashMap<String, Meter> with all the possible value of tableName in `open` mehtod and get the corresponding Meter according to tableName in the `invoke` method.
Thanks,
Lei
wanglei2@geekplus.com.cn
Sender: wanglei2@geekplus.com.cn
Send Time: 2020-07-03 14:27
Receiver: Xintong Song
cc: user
Subject: Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Xintong,
Yes, initializing the metric in the `open` method works, but it doesn't solve my problem.
I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it.
Actually my scenario is as follows.
The record is MySQL binlog info. I want to monitor the qps by tableName. The tableName is different for every record.
Thanks,
Lei
wanglei2@geekplus.com.cn
Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Lei,
I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record.
Thank you~
Xintong Song
On Fri, Jul 3, 2020 at 11:50 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
In one flink operator, i want to initialize multiple flink metrics according to message content.
As the code below.
public void invoke(ObjectNode node, Context context) throws Exception {
String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" + meter.getCount());
But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?
Thanks,
Lei
wanglei2@geekplus.com.cn
Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Hi Xintong,
Yes, initializing the metric in the `open` method works, but it doesn't solve my problem.
I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it.
Actually my scenario is as follows.
The record is MySQL binlog info. I want to monitor the qps by tableName. The tableName is different for every record.
Thanks,
Lei
wanglei2@geekplus.com.cn
Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Lei,
I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record.
Thank you~
Xintong Song
On Fri, Jul 3, 2020 at 11:50 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
In one flink operator, i want to initialize multiple flink metrics according to message content.
As the code below.
public void invoke(ObjectNode node, Context context) throws Exception {
String tableName = node.get("metadata").get("topic").asText();
Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
meter.markEvent();
log.info("### counter: " + meter.toString() + "\t" + meter.getCount());
But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?
Thanks,
Lei
wanglei2@geekplus.com.cn
Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Posted by Xintong Song <to...@gmail.com>.
Hi Lei,
I think you should initialize the metric in the `open` method. Then you can
save the initialized metric as a class field, and update it in the `invoke`
method for each record.
Thank you~
Xintong Song
On Fri, Jul 3, 2020 at 11:50 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> In one flink operator, i want to initialize multiple flink metrics
> according to message content.
> As the code below.
>
> public void invoke(ObjectNode node, Context context) throws Exception {
>
> String tableName = node.get("metadata").get("topic").asText();
> Meter meter = getRuntimeContext().getMetricGroup().meter(tableName,
> new MeterView(10));
> meter.markEvent();
> log.info("### counter: " + meter.toString() + "\t" +
> meter.getCount());
>
>
> But in this way every invoke call will initialize a new metrics and the
> count will be from zero again.
> How can i reuse the metric initialized before?
>
> Thanks,
> Lei
> ------------------------------
> wanglei2@geekplus.com.cn
>
>