You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/07/03 03:50:31 UTC

How to dynamically initialize flink metrics in invoke method and then reuse it?

In one flink operator, i want to initialize multiple flink metrics according to message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

    String tableName = node.get("metadata").get("topic").asText();
    Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
    meter.markEvent();
    log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wanglei2@geekplus.com.cn 


Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

Posted by Xintong Song <to...@gmail.com>.
Ok, I see your problem. And yes, keeping a map of metrics should work.

Just for double checking, I assume there's an upper bound of your map keys
(table names)?
Because if not, an infinitely increasing in-memory map that is not managed
by Flink's state might become problematic.

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 2:39 PM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> Seems there's no direct solution.
> Perhaps i can implement this by initializing a HashMap<String, Meter> with
> all the possible value of tableName  in `open` mehtod and get the
> corresponding  Meter according to tableName in the `invoke` method.
>
>
> Thanks,
> Lei
> ------------------------------
> wanglei2@geekplus.com.cn
>
>
> *Sender:* wanglei2@geekplus.com.cn
> *Send Time:* 2020-07-03 14:27
> *Receiver:* Xintong Song <to...@gmail.com>
> *cc:* user <us...@flink.apache.org>
> *Subject:* Re: Re: How to dynamically initialize flink metrics in invoke
> method and then reuse it?
> Hi Xintong,
>
> Yes, initializing the metric in the `open` method works, but it doesn't
> solve my problem.
> I want to initialize the metric with a name that is extracted from the
> record content. Only in the `invoke` method i can do it.
>
> Actually my scenario is as follows.
> The record is MySQL binlog info.  I want to monitor the qps by tableName.
> The tableName is different for every record.
>
> Thanks,
> Lei
>
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
> *Sender:* Xintong Song <to...@gmail.com>
> *Send Time:* 2020-07-03 13:14
> *Receiver:* wanglei2@geekplus.com.cn
> *cc:* user <us...@flink.apache.org>
> *Subject:* Re: How to dynamically initialize flink metrics in invoke
> method and then reuse it?
> Hi Lei,
>
> I think you should initialize the metric in the `open` method. Then you
> can save the initialized metric as a class field, and update it in the
> `invoke` method for each record.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jul 3, 2020 at 11:50 AM wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>>
>> In one flink operator, i want to initialize multiple flink metrics
>> according to message content.
>> As the code below.
>>
>> public void invoke(ObjectNode node, Context context) throws Exception {
>>
>>     String tableName = node.get("metadata").get("topic").asText();
>>     Meter meter = getRuntimeContext().getMetricGroup().meter(tableName,
>> new MeterView(10));
>>     meter.markEvent();
>>     log.info("### counter: " + meter.toString() + "\t" +
>> meter.getCount());
>>
>>
>> But in this way every invoke call will initialize a new metrics and the
>> count will be from zero again.
>> How can i reuse the metric initialized before?
>>
>> Thanks,
>> Lei
>> ------------------------------
>> wanglei2@geekplus.com.cn
>>
>>

Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Seems there's no direct solution.
Perhaps i can implement this by initializing a HashMap<String, Meter> with all the possible value of tableName  in `open` mehtod and get the corresponding  Meter according to tableName in the `invoke` method. 


Thanks,
Lei 


wanglei2@geekplus.com.cn 
 
Sender: wanglei2@geekplus.com.cn
Send Time: 2020-07-03 14:27
Receiver: Xintong Song
cc: user
Subject: Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. 
I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The tableName is different for every record. 

Thanks,
Lei




wanglei2@geekplus.com.cn 

Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record.

Thank you~
Xintong Song


On Fri, Jul 3, 2020 at 11:50 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

In one flink operator, i want to initialize multiple flink metrics according to message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

    String tableName = node.get("metadata").get("topic").asText();
    Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
    meter.markEvent();
    log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wanglei2@geekplus.com.cn 


Re: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Hi Xintong, 

Yes, initializing the metric in the `open` method works, but it doesn't solve my problem. 
I want to initialize the metric with a name that is extracted from the record content. Only in the `invoke` method i can do it.

Actually my scenario is as follows.
The record is MySQL binlog info.  I want to monitor the qps by tableName. The tableName is different for every record. 

Thanks,
Lei




wanglei2@geekplus.com.cn 

Sender: Xintong Song
Send Time: 2020-07-03 13:14
Receiver: wanglei2@geekplus.com.cn
cc: user
Subject: Re: How to dynamically initialize flink metrics in invoke method and then reuse it?
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can save the initialized metric as a class field, and update it in the `invoke` method for each record.

Thank you~
Xintong Song


On Fri, Jul 3, 2020 at 11:50 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

In one flink operator, i want to initialize multiple flink metrics according to message content. 
As the code below.

public void invoke(ObjectNode node, Context context) throws Exception {

    String tableName = node.get("metadata").get("topic").asText();
    Meter meter = getRuntimeContext().getMetricGroup().meter(tableName, new MeterView(10));
    meter.markEvent();
    log.info("### counter: " + meter.toString() + "\t" +  meter.getCount());


But in this way every invoke call will initialize a new metrics and the count will be from zero again.
How can i reuse the metric initialized before?

Thanks,
Lei


wanglei2@geekplus.com.cn 


Re: How to dynamically initialize flink metrics in invoke method and then reuse it?

Posted by Xintong Song <to...@gmail.com>.
Hi Lei,

I think you should initialize the metric in the `open` method. Then you can
save the initialized metric as a class field, and update it in the `invoke`
method for each record.

Thank you~

Xintong Song



On Fri, Jul 3, 2020 at 11:50 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> In one flink operator, i want to initialize multiple flink metrics
> according to message content.
> As the code below.
>
> public void invoke(ObjectNode node, Context context) throws Exception {
>
>     String tableName = node.get("metadata").get("topic").asText();
>     Meter meter = getRuntimeContext().getMetricGroup().meter(tableName,
> new MeterView(10));
>     meter.markEvent();
>     log.info("### counter: " + meter.toString() + "\t" +
> meter.getCount());
>
>
> But in this way every invoke call will initialize a new metrics and the
> count will be from zero again.
> How can i reuse the metric initialized before?
>
> Thanks,
> Lei
> ------------------------------
> wanglei2@geekplus.com.cn
>
>