You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2021/03/16 03:13:18 UTC

Unit Testing for Custom Metrics in Flink

Hi all,

Recently, I was working on adding some custom metrics to a Flink job that
required the use of dynamic labels (i.e. capturing various counters that
were "slicable" by things like tenant / source, etc.).

I ended up handling it in a very naive fashion that would just keep a
dictionary of metrics that had already been registered and update them
accordingly which looked something like this:

class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {
    private lateinit var metrics: CustomMetricsRegistry

    override fun open(parameters: Configuration) {
        metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
    }

    override fun processElement(event: Event, context: Context,
collector: Collector<Unit>) {
        // Insert calls like metrics.inc("tenant-name", 4) here
    }
}

class CustomMetricsRegistry(private val metricGroup: MetricGroup):
Serializable {
    // Increments a given metric by key
    fun inc(metric: String, tenant: String, amount: Long = 1) {
        // Store a key for the metric
        val key = "$metric-$tenant"
        // Store/register the metric
        if (!registeredMetrics.containsKey(key)){
            registeredMetrics[key] = metricGroup
                .addGroup("tenant", tenant)
                .counter(metric)
        }

        // Update the metric by a given amount
        registeredMetrics[key]!!.inc(amount)
    }

    companion object {
        private var registeredMetrics: HashMap<String, Counter> = hashMapOf()
    }
}

Basically registering and updating new metrics for tenants as they are
encountered, which I've seen being emitted as expected via hitting the
appropriately configured metrics endpoint (using a PrometheusReporter).

However, while I was trying to write a few unit tests for this, I seemed to
encounter an issue. I was following a Stack Overflow post that was answered
by @Chesnay Schepler <ch...@apache.org> [0] that described the use of an
in-memory/embedded Flink cluster and a custom reporter that would
statically expose the underlying metrics.

So I took a shot at implementing something similar as follows:

*Flink Cluster Definition*

private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
    ConfigConstants.METRICS_REPORTER_PREFIX +
    "MockCustomMetricsReporter." +
    ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
MockCustomMetricsReporter::class.java.name
))

@ClassRule
@JvmField
val flinkCluster = MiniClusterResource(
    MiniClusterResourceConfiguration.Builder()
        .setConfiguration(metricsConfiguration)
        .setNumberTaskManagers(1)
        .setNumberSlotsPerTaskManager(1)
        .build()
)

*Custom Reporter*

class MockCustomMetricsReporter : MetricReporter {

    override fun open(metricConfig: MetricConfig) {}
    override fun close() {}
    override fun notifyOfAddedMetric(metric: Metric, name: String,
metricGroup: MetricGroup) {
        // Store the metrics that are being registered as we see them
        if (!registeredCustomMetrics.containsKey(name)){
            registeredCustomMetrics[name] = metric
        }
    }

    override fun notifyOfRemovedMetric(metric: Metric, name: String,
metricGroup: MetricGroup) {
        // Do nothing here
    }

    companion object {
        // Static reference to metrics as they are registered
        var registeredCustomMetrics = HashMap<String, Metric>()
    }
}

*Example Test*

@Test
fun `Example Metrics Use Case`(){
    // Arrange
    val stream = StreamExecutionEnvironment.getExecutionEnvironment()
    val events = listOf(
        eventWithUsers("tenant1", "user1@testing.com"),
        eventWithUsers("tenant2", "user2@testing.com"),
    )

    // Act
    stream
        .fromCollection(events)
        .process(MyCustomProcessFunction())

    // Assert
    stream.execute()
    assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
}

While this test will pass, *the problem is that the custom metrics defined
dynamically (via the CustomMetricsRegistry implementation) do not appear
within the registeredCustomMetrics collection*. In fact, there are 21
metrics that get registered but all of them appear to be classic
out-of-the-box metrics such as CPU usage, number of task managers, load,
various other Netty and JVM stats, but no custom metrics are included.

I've tried multiple different configurations, implementations via a custom
TestHarness, etc. but for some reason the custom metrics being defined are
never triggering the notifyOfAddedMetric function which would be
responsible for adding them to the static collection to be asserted
against.

Any ideas / guidance would be more than welcome. Perhaps a different
approach? Based off examples I've encountered, the code seems like it
should "just work".

Thanks much,

Rion

[0] :
https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink

Re: Unit Testing for Custom Metrics in Flink

Posted by Rion Williams <ri...@gmail.com>.
I've made a handful of tweaks to it to try and get them to pick up as
expected (i.e. adding logging to every available overload for the
interceptors, etc) using something similar to the following:

fun create(): InterceptingTaskMetricGroup {
    val operatorGroup = object: InterceptingOperatorMetricGroup() {
        override fun addGroup(name: Int): MetricGroup {
            // Include logging here...
        }

        override fun addGroup(name: String?): MetricGroup {
            // Include logging here...
        }

        // Repeat ad nauseum
    }

    return object: InterceptingTaskMetricGroup() {
        override fun getOrAddOperator(id: OperatorID, name: String):
OperatorMetricGroup {
            return operatorGroup
        }
    }
}

It still looks like it's only ever registering the built-in metrics and not
hitting any of those for the TestHarness execution. I've even included a
simple test metric for the function during the open() call to ensure that
it wasn't some other unrelated issue for something happening in the
processFunction() calls / dynamic metrics.

Said differently - I can see the logs being hit in the
InterceptingOperatorMetricGroup.addGroup()
calls, but only for the internal metrics from the Task/JobManagers
respectively, nothing custom.

Rion


On Tue, Mar 16, 2021 at 11:00 AM Chesnay Schepler <ch...@apache.org>
wrote:

> Actually you'd have to further subclass the operatorMetricGroup such that
> addGroup works as expected.
> This is admittedly a bit of a drag :/
>
> On 3/16/2021 4:35 PM, Chesnay Schepler wrote:
>
> The test harness is fully independent of the MiniClusterResource; it isn't
> actually running a job. That's why your metrics never arrive at the
> reporter.
>
> You can either:
> a) use the test harness with a custom MetricGroup implementation that
> intercepts registered metrics, set in the MockEnvironment
> b) use the function as part of a job with the custom reporter approach.
> (essentially, fromElements -> function -> discarding sink)
>
> The following would work for a), but it must be noted that this relies on
> quite a few things that are internal to Flink:
>
> ...
>
> InterceptingOperatorMetricGroup operatorMetricGroup =
>         new InterceptingOperatorMetricGroup();InterceptingTaskMetricGroup taskMetricGroup =
>         new InterceptingTaskMetricGroup() {
>             @Override            public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
>                 return operatorMetricGroup;            }
>         };new MockEnvironmentBuilder()
>     .setMetricGroup(taskMetricGroup)
>
> ...
>
>
> On 3/16/2021 3:42 PM, Rion Williams wrote:
>
> In this case, I was using a harness to test the function. Although, I
> could honestly care less about the unit-test surrounding metrics, I'm much
> more concerned with having something that will actually run and work as
> intended within a job. The only real concern I have or problem that I want
> to solve is building metrics that may vary based on the data coming in from
> a "label" perspective (e.g. keeping track of the events I've seen for a
> given tenant, or some other properties).
>
> Something like:
>
> <metric prefix>_events_seen { tenant = "tenant-1" } 1.0
> <metric prefix>_events_seen { tenant = "tenant-2" } 200.0
>
> If that makes sense. I've used the Prometheus client previously to
> accomplish these types of metrics, but since I'm fairly new to the Flink
> world, I was trying to use the built-in constructs available (thus the
> dynamic groups / metrics being added).
>
> On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> Are you actually running a job, or are you using a harness for testing
>> your function?
>>
>> On 3/16/2021 3:24 PM, Rion Williams wrote:
>>
>> Hi Chesnay,
>>
>> Thanks for the prompt response and feedback, it's very much appreciated.
>> Please see the inline responses below to your questions:
>>
>> *Was there anything in the logs (ideally on debug)?*
>>
>>
>> I didn't see anything within the logs that seemed to indicate anything
>> out of the ordinary. I'm currently using a MiniClusterResources for this
>> and attempted to set the logging levels to pick up everything (i.e. ALL),
>> but if there's a way to expose more, I'm not aware of it.
>>
>> *Have you debugged the execution and followed the counter() calls all the
>>> way to the reporter?*
>>
>>
>> With the debugger, I traced one of the counter initializations and it
>> seems that no reporters were being found within the register call in the
>> MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
>>
>> if (this.reporters != null) {
>>     for(int i = 0; i < this.reporters.size(); ++i) {
>>         MetricRegistryImpl.ReporterAndSettings reporterAndSettings = (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>>
>>         try {
>>             if (reporterAndSettings != null) {
>>                 FrontMetricGroup front = new FrontMetricGroup(reporterAndSettings.getSettings(), group);
>>                 reporterAndSettings.getReporter().notifyOfAddedMetric(metric, metricName, front);
>>             }
>>         } catch (Exception var11) {
>>             LOG.warn("Error while registering metric: {}.", metricName, var11);
>>         }
>>     }
>> }
>>
>>  Perhaps this is an error on my part as I had assumed the following would
>> be sufficient to register my reporter (within a local / minicluster
>> environment):
>>
>> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>>     ConfigConstants.METRICS_REPORTER_PREFIX +
>>     "MockCustomMetricsReporter." +
>>     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name))
>> @ClassRule@JvmFieldval flink = MiniClusterResource(
>>     MiniClusterResourceConfiguration.Builder()
>>         .setConfiguration(metricsConfiguration)
>>         .setNumberTaskManagers(1)
>>         .setNumberSlotsPerTaskManager(1)
>>         .build()
>> )
>>
>> However, it's clearly being recognized for the built-in metrics, just not
>> these custom ones that are being registered as they are triggering the
>> notifyOfAddedMetric() function within the reporter itself.
>>
>> *Do you only see JobManager metrics, or is there somewhere also something
>>> about the TaskManager?*
>>
>>
>> It looks like there are metrics coming from both the JobManager and
>> TaskManagers from the following examples that were coming out:
>>
>> localhost.jobmanager.numRegisteredTaskManagers
>> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
>> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
>> localhost.jobmanager.Status.JVM.Memory.Direct.Count
>>
>> I do agree that a factory implementation with a static reporter would
>> likely be a better approach, so I may explore that a bit more. As well as
>> adding some changes to the existing, albeit ghetto, implementation for
>> handling the dynamic metrics. I did see several references to a
>> MetricRegistry class, however I wasn't sure if that was the most
>> appropriate place to add this type of functionality or if it was needed at
>> all.
>>
>> Thanks much,
>>
>> Rion
>>
>>
>>
>> On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <ch...@apache.org>
>> wrote:
>>
>>> Was there anything in the logs (ideally on debug)?
>>> Have you debugged the execution and followed the counter() calls all the
>>> way to the reporter?
>>> Do you only see JobManager metrics, or is there somewhere also something
>>> about the TaskManager?
>>>
>>> I can see several issues with your code, but none that would fully
>>> explain the issue:
>>>
>>> a) your reporter is not thread-safe
>>> b) you only differentiate metrics by name, which will lead to quite a
>>> few collisions.
>>>
>>> Be also aware that there will be 2 reporter instances; one for the JM
>>> and one for the TM.
>>> To remedy this, I would recommend creating a factory that returns a
>>> static reporter instance instead; overall this tends to be cleaner.
>>>
>>> Alternatively, when using the testing harnesses IIRC you can also set
>>> set a custom MetricGroup implementation.
>>>
>>> On 3/16/2021 4:13 AM, Rion Williams wrote:
>>>
>>> Hi all,
>>>
>>> Recently, I was working on adding some custom metrics to a Flink job
>>> that required the use of dynamic labels (i.e. capturing various counters
>>> that were "slicable" by things like tenant / source, etc.).
>>>
>>> I ended up handling it in a very naive fashion that would just keep a
>>> dictionary of metrics that had already been registered and update them
>>> accordingly which looked something like this:
>>>
>>> class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {
>>>     private lateinit var metrics: CustomMetricsRegistry    override fun open(parameters: Configuration) {
>>>         metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
>>>     }
>>>
>>>     override fun processElement(event: Event, context: Context, collector: Collector<Unit>) {
>>>         // Insert calls like metrics.inc("tenant-name", 4) here    }
>>> }
>>> class CustomMetricsRegistry(private val metricGroup: MetricGroup): Serializable {
>>>     // Increments a given metric by key    fun inc(metric: String, tenant: String, amount: Long = 1) {
>>>         // Store a key for the metric        val key = "$metric-$tenant"        // Store/register the metric        if (!registeredMetrics.containsKey(key)){
>>>             registeredMetrics[key] = metricGroup                .addGroup("tenant", tenant)
>>>                 .counter(metric)
>>>         }
>>>
>>>         // Update the metric by a given amount        registeredMetrics[key]!!.inc(amount)
>>>     }
>>>
>>>     companion object {
>>>         private var registeredMetrics: HashMap<String, Counter> = hashMapOf()
>>>     }
>>> }
>>>
>>> Basically registering and updating new metrics for tenants as they are
>>> encountered, which I've seen being emitted as expected via hitting the
>>> appropriately configured metrics endpoint (using a PrometheusReporter).
>>>
>>>
>>> However, while I was trying to write a few unit tests for this, I seemed
>>> to encounter an issue. I was following a Stack Overflow post that was
>>> answered by @Chesnay Schepler <ch...@apache.org> [0] that described
>>> the use of an in-memory/embedded Flink cluster and a custom reporter that
>>> would statically expose the underlying metrics.
>>>
>>> So I took a shot at implementing something similar as follows:
>>>
>>> *Flink Cluster Definition*
>>>
>>> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>>>     ConfigConstants.METRICS_REPORTER_PREFIX +
>>>     "MockCustomMetricsReporter." +
>>>     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name))
>>> @ClassRule@JvmFieldval flinkCluster = MiniClusterResource(
>>>     MiniClusterResourceConfiguration.Builder()
>>>         .setConfiguration(metricsConfiguration)
>>>         .setNumberTaskManagers(1)
>>>         .setNumberSlotsPerTaskManager(1)
>>>         .build()
>>> )
>>>
>>> *Custom Reporter*
>>>
>>> class MockCustomMetricsReporter : MetricReporter {
>>>
>>>     override fun open(metricConfig: MetricConfig) {}
>>>     override fun close() {}
>>>     override fun notifyOfAddedMetric(metric: Metric, name: String, metricGroup: MetricGroup) {
>>>         // Store the metrics that are being registered as we see them        if (!registeredCustomMetrics.containsKey(name)){
>>>             registeredCustomMetrics[name] = metric        }
>>>     }
>>>
>>>     override fun notifyOfRemovedMetric(metric: Metric, name: String, metricGroup: MetricGroup) {
>>>         // Do nothing here    }
>>>
>>>     companion object {
>>>         // Static reference to metrics as they are registered        var registeredCustomMetrics = HashMap<String, Metric>()
>>>     }
>>> }
>>>
>>> *Example Test*
>>>
>>> @Testfun `Example Metrics Use Case`(){
>>>     // Arrange    val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>>     val events = listOf(
>>>         eventWithUsers("tenant1", "user1@testing.com"),
>>>         eventWithUsers("tenant2", "user2@testing.com"),
>>>     )
>>>
>>>     // Act    stream
>>>         .fromCollection(events)
>>>         .process(MyCustomProcessFunction())
>>>
>>>     // Assert    stream.execute()
>>>     assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
>>> }
>>>
>>> While this test will pass, *the problem is that the custom metrics
>>> defined dynamically (via the CustomMetricsRegistry implementation) do not
>>> appear within the registeredCustomMetrics collection*. In fact, there
>>> are 21 metrics that get registered but all of them appear to be classic
>>> out-of-the-box metrics such as CPU usage, number of task managers, load,
>>> various other Netty and JVM stats, but no custom metrics are included.
>>>
>>> I've tried multiple different configurations, implementations via a
>>> custom TestHarness, etc. but for some reason the custom metrics being
>>> defined are never triggering the notifyOfAddedMetric function which
>>> would be responsible for adding them to the static collection to be
>>> asserted against.
>>>
>>> Any ideas / guidance would be more than welcome. Perhaps a different
>>> approach? Based off examples I've encountered, the code seems like it
>>> should "just work".
>>>
>>> Thanks much,
>>>
>>> Rion
>>>
>>> [0] :
>>> https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>>>
>>>
>>>
>>>
>>
>
>

Re: Unit Testing for Custom Metrics in Flink

Posted by Chesnay Schepler <ch...@apache.org>.
Actually you'd have to further subclass the operatorMetricGroup such 
that addGroup works as expected.
This is admittedly a bit of a drag :/

On 3/16/2021 4:35 PM, Chesnay Schepler wrote:
> The test harness is fully independent of the MiniClusterResource; it 
> isn't actually running a job. That's why your metrics never arrive at 
> the reporter.
>
> You can either:
> a) use the test harness with a custom MetricGroup implementation that 
> intercepts registered metrics, set in the MockEnvironment
> b) use the function as part of a job with the custom reporter 
> approach. (essentially, fromElements -> function -> discarding sink)
>
> The following would work for a), but it must be noted that this relies 
> on quite a few things that are internal to Flink:
>
> ...
> InterceptingOperatorMetricGroup operatorMetricGroup =
>          new InterceptingOperatorMetricGroup(); InterceptingTaskMetricGroup taskMetricGroup =
>          new InterceptingTaskMetricGroup() {
>              @Override public OperatorMetricGroupgetOrAddOperator(OperatorID id, String name) {
>                  return operatorMetricGroup; }
>          };
> new MockEnvironmentBuilder()
>      .setMetricGroup(taskMetricGroup)
>
> ...
>
> On 3/16/2021 3:42 PM, Rion Williams wrote:
>> In this case, I was using a harness to test the function. Although, I 
>> could honestly care less about the unit-test surrounding metrics, I'm 
>> much more concerned with having something that will actually run and 
>> work as intended within a job. The only real concern I have or 
>> problem that I want to solve is building metrics that may vary based 
>> on the data coming in from a "label" perspective (e.g. keeping track 
>> of the events I've seen for a given tenant, or some other properties).
>>
>> Something like:
>>
>> <metric prefix>_events_seen { tenant = "tenant-1" } 1.0
>> <metric prefix>_events_seen { tenant = "tenant-2" } 200.0
>>
>> If that makes sense. I've used the Prometheus client previously to 
>> accomplish these types of metrics, but since I'm fairly new to the 
>> Flink world, I was trying to use the built-in constructs available 
>> (thus the dynamic groups / metrics being added).
>>
>> On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <chesnay@apache.org 
>> <ma...@apache.org>> wrote:
>>
>>     Are you actually running a job, or are you using a harness for
>>     testing your function?
>>
>>     On 3/16/2021 3:24 PM, Rion Williams wrote:
>>>     Hi Chesnay,
>>>
>>>     Thanks for the prompt response and feedback, it's very much
>>>     appreciated. Please see the inline responses below to your
>>>     questions:
>>>
>>>         *Was there anything in the logs (ideally on debug)?*
>>>
>>>
>>>     I didn't see anything within the logs that seemed to indicate
>>>     anything out of the ordinary. I'm currently using a
>>>     MiniClusterResources for this and attempted to set the logging
>>>     levels to pick up everything (i.e. ALL), but if there's a way to
>>>     expose more, I'm not aware of it.
>>>
>>>         *Have you debugged the execution and followed the counter()
>>>         calls all the way to the reporter?*
>>>
>>>
>>>     With the debugger, I traced one of the counter initializations
>>>     and it seems that no reporters were being found within the
>>>     register call in the MetricsRegistryImpl (i.e. this.reporters
>>>     has no registered reporters):
>>>     if (this.reporters !=null) {
>>>          for(int i =0; i <this.reporters.size(); ++i) {
>>>              MetricRegistryImpl.ReporterAndSettings reporterAndSettings = (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>>>
>>>              try {
>>>                  if (reporterAndSettings !=null) {
>>>                      FrontMetricGroup front =new FrontMetricGroup(reporterAndSettings.getSettings(),group);
>>>                      reporterAndSettings.getReporter().notifyOfAddedMetric(metric,metricName, front);
>>>                  }
>>>              }catch (Exception var11) {
>>>                  LOG.warn("Error while registering metric: {}.",metricName,var11);
>>>              }
>>>          }
>>>     }
>>>      Perhaps this is an error on my part as I had assumed the
>>>     following would be sufficient to register my reporter (within a
>>>     local / minicluster environment):
>>>     private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
>>>          ConfigConstants.METRICS_REPORTER_PREFIX +
>>>          "MockCustomMetricsReporter." +
>>>          ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name ))
>>>
>>>     @ClassRule @JvmField val flink =MiniClusterResource(
>>>          MiniClusterResourceConfiguration.Builder()
>>>              .setConfiguration(metricsConfiguration)
>>>              .setNumberTaskManagers(1)
>>>              .setNumberSlotsPerTaskManager(1)
>>>              .build()
>>>     )
>>>     However, it's clearly being recognized for the built-in metrics,
>>>     just not these custom ones that are being registered as they are
>>>     triggering the notifyOfAddedMetric() function within the
>>>     reporter itself.
>>>
>>>         *Do you only see JobManager metrics, or is there somewhere
>>>         also something about the TaskManager?*
>>>
>>>
>>>     It looks like there are metrics coming from both the JobManager
>>>     and TaskManagers from the following examples that were coming out:
>>>     localhost.jobmanager.numRegisteredTaskManagers
>>>     .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
>>>     .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
>>>     localhost.jobmanager.Status.JVM.Memory.Direct.Count
>>>     I do agree that a factory implementation with a static reporter
>>>     would likely be a better approach, so I may explore that a bit
>>>     more. As well as adding some changes to the existing, albeit
>>>     ghetto, implementation for handling the dynamic metrics. I did
>>>     see several references to a MetricRegistry class, however I
>>>     wasn't sure if that was the most appropriate place to add this
>>>     type of functionality or if it was needed at all.
>>>
>>>     Thanks much,
>>>
>>>     Rion
>>>
>>>
>>>
>>>     On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler
>>>     <chesnay@apache.org <ma...@apache.org>> wrote:
>>>
>>>         Was there anything in the logs (ideally on debug)?
>>>         Have you debugged the execution and followed the counter()
>>>         calls all the way to the reporter?
>>>         Do you only see JobManager metrics, or is there somewhere
>>>         also something about the TaskManager?
>>>
>>>         I can see several issues with your code, but none that would
>>>         fully explain the issue:
>>>
>>>         a) your reporter is not thread-safe
>>>         b) you only differentiate metrics by name, which will lead
>>>         to quite a few collisions.
>>>
>>>         Be also aware that there will be 2 reporter instances; one
>>>         for the JM and one for the TM.
>>>         To remedy this, I would recommend creating a factory that
>>>         returns a static reporter instance instead; overall this
>>>         tends to be cleaner.
>>>
>>>         Alternatively, when using the testing harnesses IIRC you can
>>>         also set set a custom MetricGroup implementation.
>>>
>>>         On 3/16/2021 4:13 AM, Rion Williams wrote:
>>>>         Hi all,
>>>>
>>>>         Recently, I was working on adding some custom metrics to a
>>>>         Flink job that required the use of dynamic labels (i.e.
>>>>         capturing various counters that were "slicable" by things
>>>>         like tenant / source, etc.).
>>>>
>>>>         I ended up handling it in a very naive fashion that would
>>>>         just keep a dictionary of metrics that had already been
>>>>         registered and update them accordingly which looked
>>>>         something like this:
>>>>         class MyCustomProcessFunction:ProcessFunction<Event,Unit>() {
>>>>              private lateinit var metrics:CustomMetricsRegistry override fun open(parameters:Configuration) {
>>>>                  metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
>>>>              }
>>>>
>>>>              override fun processElement(event:Event,context:Context,collector:Collector<Unit>) {
>>>>                  // Insert calls like metrics.inc("tenant-name", 4) here }
>>>>         }
>>>>
>>>>         class CustomMetricsRegistry(private val metricGroup:MetricGroup):Serializable {
>>>>              // Increments a given metric by key fun inc(metric:String,tenant:String,amount:Long =1) {
>>>>                  // Store a key for the metric val key ="$metric-$tenant" // Store/register the metric if (!registeredMetrics.containsKey(key)){
>>>>                      registeredMetrics[key] =metricGroup .addGroup("tenant",tenant)
>>>>                          .counter(metric)
>>>>                  }
>>>>
>>>>                  // Update the metric by a given amount registeredMetrics[key]!!.inc(amount)
>>>>              }
>>>>
>>>>              companion object {
>>>>                  private var registeredMetrics:HashMap<String,Counter> = hashMapOf()
>>>>              }
>>>>         }
>>>>         Basically registering and updating new metrics for tenants
>>>>         as they are encountered, which I've seen being emitted as
>>>>         expected via hitting the appropriately configured metrics
>>>>         endpoint (using a PrometheusReporter).
>>>>
>>>>         However, while I was trying to write a few unit tests for
>>>>         this, I seemed to encounter an issue. I was following a
>>>>         Stack Overflow post that was answered by @Chesnay Schepler
>>>>         <ma...@apache.org> [0] that described the use of
>>>>         an in-memory/embedded Flink cluster and a custom reporter
>>>>         that would statically expose the underlying metrics.
>>>>
>>>>         So I took a shot at implementing something similar as follows:
>>>>
>>>>         *Flink Cluster Definition*
>>>>         private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
>>>>              ConfigConstants.METRICS_REPORTER_PREFIX +
>>>>              "MockCustomMetricsReporter." +
>>>>              ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name ))
>>>>
>>>>         @ClassRule @JvmField val flinkCluster =MiniClusterResource(
>>>>              MiniClusterResourceConfiguration.Builder()
>>>>                  .setConfiguration(metricsConfiguration)
>>>>                  .setNumberTaskManagers(1)
>>>>                  .setNumberSlotsPerTaskManager(1)
>>>>                  .build()
>>>>         )
>>>>         *Custom Reporter*
>>>>         class MockCustomMetricsReporter :MetricReporter {
>>>>
>>>>              override fun open(metricConfig:MetricConfig) {}
>>>>              override fun close() {}
>>>>              override fun notifyOfAddedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
>>>>                  // Store the metrics that are being registered as we see
>>>>         them if (!registeredCustomMetrics.containsKey(name)){
>>>>                      registeredCustomMetrics[name] =metric }
>>>>              }
>>>>
>>>>              override fun notifyOfRemovedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
>>>>                  // Do nothing here }
>>>>
>>>>              companion object {
>>>>                  // Static reference to metrics as they are registered var
>>>>         registeredCustomMetrics =HashMap<String,Metric>()
>>>>              }
>>>>         }
>>>>         *Example Test*
>>>>         @Test fun `Example Metrics Use Case`(){
>>>>              // Arrange val stream =StreamExecutionEnvironment.getExecutionEnvironment()
>>>>              val events =listOf(
>>>>                  eventWithUsers("tenant1","user1@testing.com <ma...@testing.com>"),
>>>>                  eventWithUsers("tenant2","user2@testing.com <ma...@testing.com>"),
>>>>              )
>>>>
>>>>              // Act stream
>>>>                  .fromCollection(events)
>>>>                  .process(MyCustomProcessFunction())
>>>>
>>>>              // Assert stream.execute()
>>>>              assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
>>>>         }
>>>>         While this test will pass, *the problem is that the custom
>>>>         metrics defined dynamically (via the CustomMetricsRegistry
>>>>         implementation) do not appear within the
>>>>         registeredCustomMetrics collection*. In fact, there are 21
>>>>         metrics that get registered but all of them appear to be
>>>>         classic out-of-the-box metrics such as CPU usage, number of
>>>>         task managers, load, various other Netty and JVM stats, but
>>>>         no custom metrics are included.
>>>>
>>>>         I've tried multiple different configurations,
>>>>         implementations via a custom TestHarness, etc. but for some
>>>>         reason the custom metrics being defined are never
>>>>         triggering the notifyOfAddedMetric function which would be
>>>>         responsible for adding them to the static collection to be
>>>>         asserted against.
>>>>
>>>>         Any ideas / guidance would be more than welcome. Perhaps a
>>>>         different approach? Based off examples I've encountered,
>>>>         the code seems like it should "just work".
>>>>
>>>>         Thanks much,
>>>>
>>>>         Rion
>>>>
>>>>         [0] :
>>>>         https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>>>>         <https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink>
>>>>
>>>>
>>>
>>
>


Re: Unit Testing for Custom Metrics in Flink

Posted by Chesnay Schepler <ch...@apache.org>.
The test harness is fully independent of the MiniClusterResource; it 
isn't actually running a job. That's why your metrics never arrive at 
the reporter.

You can either:
a) use the test harness with a custom MetricGroup implementation that 
intercepts registered metrics, set in the MockEnvironment
b) use the function as part of a job with the custom reporter approach. 
(essentially, fromElements -> function -> discarding sink)

The following would work for a), but it must be noted that this relies 
on quite a few things that are internal to Flink:

...

InterceptingOperatorMetricGroup operatorMetricGroup =
         new InterceptingOperatorMetricGroup(); InterceptingTaskMetricGroup taskMetricGroup =
         new InterceptingTaskMetricGroup() {
             @Override public OperatorMetricGroupgetOrAddOperator(OperatorID id, String name) {
                 return operatorMetricGroup; }
         };
new MockEnvironmentBuilder()
     .setMetricGroup(taskMetricGroup)

...


On 3/16/2021 3:42 PM, Rion Williams wrote:
> In this case, I was using a harness to test the function. Although, I 
> could honestly care less about the unit-test surrounding metrics, I'm 
> much more concerned with having something that will actually run and 
> work as intended within a job. The only real concern I have or problem 
> that I want to solve is building metrics that may vary based on the 
> data coming in from a "label" perspective (e.g. keeping track of the 
> events I've seen for a given tenant, or some other properties).
>
> Something like:
>
> <metric prefix>_events_seen { tenant = "tenant-1" } 1.0
> <metric prefix>_events_seen { tenant = "tenant-2" } 200.0
>
> If that makes sense. I've used the Prometheus client previously to 
> accomplish these types of metrics, but since I'm fairly new to the 
> Flink world, I was trying to use the built-in constructs available 
> (thus the dynamic groups / metrics being added).
>
> On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     Are you actually running a job, or are you using a harness for
>     testing your function?
>
>     On 3/16/2021 3:24 PM, Rion Williams wrote:
>>     Hi Chesnay,
>>
>>     Thanks for the prompt response and feedback, it's very much
>>     appreciated. Please see the inline responses below to your questions:
>>
>>         *Was there anything in the logs (ideally on debug)?*
>>
>>
>>     I didn't see anything within the logs that seemed to indicate
>>     anything out of the ordinary. I'm currently using a
>>     MiniClusterResources for this and attempted to set the logging
>>     levels to pick up everything (i.e. ALL), but if there's a way to
>>     expose more, I'm not aware of it.
>>
>>         *Have you debugged the execution and followed the counter()
>>         calls all the way to the reporter?*
>>
>>
>>     With the debugger, I traced one of the counter initializations
>>     and it seems that no reporters were being found within the
>>     register call in the MetricsRegistryImpl (i.e. this.reporters has
>>     no registered reporters):
>>     if (this.reporters !=null) {
>>          for(int i =0; i <this.reporters.size(); ++i) {
>>              MetricRegistryImpl.ReporterAndSettings reporterAndSettings = (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>>
>>              try {
>>                  if (reporterAndSettings !=null) {
>>                      FrontMetricGroup front =new FrontMetricGroup(reporterAndSettings.getSettings(),group);
>>                      reporterAndSettings.getReporter().notifyOfAddedMetric(metric,metricName, front);
>>                  }
>>              }catch (Exception var11) {
>>                  LOG.warn("Error while registering metric: {}.",metricName,var11);
>>              }
>>          }
>>     }
>>      Perhaps this is an error on my part as I had assumed the
>>     following would be sufficient to register my reporter (within a
>>     local / minicluster environment):
>>     private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
>>          ConfigConstants.METRICS_REPORTER_PREFIX +
>>          "MockCustomMetricsReporter." +
>>          ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name ))
>>
>>     @ClassRule @JvmField val flink =MiniClusterResource(
>>          MiniClusterResourceConfiguration.Builder()
>>              .setConfiguration(metricsConfiguration)
>>              .setNumberTaskManagers(1)
>>              .setNumberSlotsPerTaskManager(1)
>>              .build()
>>     )
>>     However, it's clearly being recognized for the built-in metrics,
>>     just not these custom ones that are being registered as they are
>>     triggering the notifyOfAddedMetric() function within the reporter
>>     itself.
>>
>>         *Do you only see JobManager metrics, or is there somewhere
>>         also something about the TaskManager?*
>>
>>
>>     It looks like there are metrics coming from both the JobManager
>>     and TaskManagers from the following examples that were coming out:
>>     localhost.jobmanager.numRegisteredTaskManagers
>>     .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
>>     .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
>>     localhost.jobmanager.Status.JVM.Memory.Direct.Count
>>     I do agree that a factory implementation with a static reporter
>>     would likely be a better approach, so I may explore that a bit
>>     more. As well as adding some changes to the existing, albeit
>>     ghetto, implementation for handling the dynamic metrics. I did
>>     see several references to a MetricRegistry class, however I
>>     wasn't sure if that was the most appropriate place to add this
>>     type of functionality or if it was needed at all.
>>
>>     Thanks much,
>>
>>     Rion
>>
>>
>>
>>     On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler
>>     <chesnay@apache.org <ma...@apache.org>> wrote:
>>
>>         Was there anything in the logs (ideally on debug)?
>>         Have you debugged the execution and followed the counter()
>>         calls all the way to the reporter?
>>         Do you only see JobManager metrics, or is there somewhere
>>         also something about the TaskManager?
>>
>>         I can see several issues with your code, but none that would
>>         fully explain the issue:
>>
>>         a) your reporter is not thread-safe
>>         b) you only differentiate metrics by name, which will lead to
>>         quite a few collisions.
>>
>>         Be also aware that there will be 2 reporter instances; one
>>         for the JM and one for the TM.
>>         To remedy this, I would recommend creating a factory that
>>         returns a static reporter instance instead; overall this
>>         tends to be cleaner.
>>
>>         Alternatively, when using the testing harnesses IIRC you can
>>         also set set a custom MetricGroup implementation.
>>
>>         On 3/16/2021 4:13 AM, Rion Williams wrote:
>>>         Hi all,
>>>
>>>         Recently, I was working on adding some custom metrics to a
>>>         Flink job that required the use of dynamic labels (i.e.
>>>         capturing various counters that were "slicable" by things
>>>         like tenant / source, etc.).
>>>
>>>         I ended up handling it in a very naive fashion that would
>>>         just keep a dictionary of metrics that had already been
>>>         registered and update them accordingly which looked
>>>         something like this:
>>>         class MyCustomProcessFunction:ProcessFunction<Event,Unit>() {
>>>              private lateinit var metrics:CustomMetricsRegistry override fun open(parameters:Configuration) {
>>>                  metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
>>>              }
>>>
>>>              override fun processElement(event:Event,context:Context,collector:Collector<Unit>) {
>>>                  // Insert calls like metrics.inc("tenant-name", 4) here }
>>>         }
>>>
>>>         class CustomMetricsRegistry(private val metricGroup:MetricGroup):Serializable {
>>>              // Increments a given metric by key fun inc(metric:String,tenant:String,amount:Long =1) {
>>>                  // Store a key for the metric val key ="$metric-$tenant" // Store/register the metric if (!registeredMetrics.containsKey(key)){
>>>                      registeredMetrics[key] =metricGroup .addGroup("tenant",tenant)
>>>                          .counter(metric)
>>>                  }
>>>
>>>                  // Update the metric by a given amount registeredMetrics[key]!!.inc(amount)
>>>              }
>>>
>>>              companion object {
>>>                  private var registeredMetrics:HashMap<String,Counter> = hashMapOf()
>>>              }
>>>         }
>>>         Basically registering and updating new metrics for tenants
>>>         as they are encountered, which I've seen being emitted as
>>>         expected via hitting the appropriately configured metrics
>>>         endpoint (using a PrometheusReporter).
>>>
>>>         However, while I was trying to write a few unit tests for
>>>         this, I seemed to encounter an issue. I was following a
>>>         Stack Overflow post that was answered by @Chesnay Schepler
>>>         <ma...@apache.org> [0] that described the use of an
>>>         in-memory/embedded Flink cluster and a custom reporter that
>>>         would statically expose the underlying metrics.
>>>
>>>         So I took a shot at implementing something similar as follows:
>>>
>>>         *Flink Cluster Definition*
>>>         private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
>>>              ConfigConstants.METRICS_REPORTER_PREFIX +
>>>              "MockCustomMetricsReporter." +
>>>              ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name ))
>>>
>>>         @ClassRule @JvmField val flinkCluster =MiniClusterResource(
>>>              MiniClusterResourceConfiguration.Builder()
>>>                  .setConfiguration(metricsConfiguration)
>>>                  .setNumberTaskManagers(1)
>>>                  .setNumberSlotsPerTaskManager(1)
>>>                  .build()
>>>         )
>>>         *Custom Reporter*
>>>         class MockCustomMetricsReporter :MetricReporter {
>>>
>>>              override fun open(metricConfig:MetricConfig) {}
>>>              override fun close() {}
>>>              override fun notifyOfAddedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
>>>                  // Store the metrics that are being registered as we see
>>>         them if (!registeredCustomMetrics.containsKey(name)){
>>>                      registeredCustomMetrics[name] =metric }
>>>              }
>>>
>>>              override fun notifyOfRemovedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
>>>                  // Do nothing here }
>>>
>>>              companion object {
>>>                  // Static reference to metrics as they are registered var
>>>         registeredCustomMetrics =HashMap<String,Metric>()
>>>              }
>>>         }
>>>         *Example Test*
>>>         @Test fun `Example Metrics Use Case`(){
>>>              // Arrange val stream =StreamExecutionEnvironment.getExecutionEnvironment()
>>>              val events =listOf(
>>>                  eventWithUsers("tenant1","user1@testing.com <ma...@testing.com>"),
>>>                  eventWithUsers("tenant2","user2@testing.com <ma...@testing.com>"),
>>>              )
>>>
>>>              // Act stream
>>>                  .fromCollection(events)
>>>                  .process(MyCustomProcessFunction())
>>>
>>>              // Assert stream.execute()
>>>              assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
>>>         }
>>>         While this test will pass, *the problem is that the custom
>>>         metrics defined dynamically (via the CustomMetricsRegistry
>>>         implementation) do not appear within the
>>>         registeredCustomMetrics collection*. In fact, there are 21
>>>         metrics that get registered but all of them appear to be
>>>         classic out-of-the-box metrics such as CPU usage, number of
>>>         task managers, load, various other Netty and JVM stats, but
>>>         no custom metrics are included.
>>>
>>>         I've tried multiple different configurations,
>>>         implementations via a custom TestHarness, etc. but for some
>>>         reason the custom metrics being defined are never triggering
>>>         the notifyOfAddedMetric function which would be responsible
>>>         for adding them to the static collection to be asserted
>>>         against.
>>>
>>>         Any ideas / guidance would be more than welcome. Perhaps a
>>>         different approach? Based off examples I've encountered, the
>>>         code seems like it should "just work".
>>>
>>>         Thanks much,
>>>
>>>         Rion
>>>
>>>         [0] :
>>>         https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>>>         <https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink>
>>>
>>>
>>
>


Re: Unit Testing for Custom Metrics in Flink

Posted by Rion Williams <ri...@gmail.com>.
In this case, I was using a harness to test the function. Although, I could
honestly care less about the unit-test surrounding metrics, I'm much more
concerned with having something that will actually run and work as intended
within a job. The only real concern I have or problem that I want to solve
is building metrics that may vary based on the data coming in from a
"label" perspective (e.g. keeping track of the events I've seen for a given
tenant, or some other properties).

Something like:

<metric prefix>_events_seen { tenant = "tenant-1" } 1.0
<metric prefix>_events_seen { tenant = "tenant-2" } 200.0

If that makes sense. I've used the Prometheus client previously to
accomplish these types of metrics, but since I'm fairly new to the Flink
world, I was trying to use the built-in constructs available (thus the
dynamic groups / metrics being added).

On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <ch...@apache.org> wrote:

> Are you actually running a job, or are you using a harness for testing
> your function?
>
> On 3/16/2021 3:24 PM, Rion Williams wrote:
>
> Hi Chesnay,
>
> Thanks for the prompt response and feedback, it's very much appreciated.
> Please see the inline responses below to your questions:
>
> *Was there anything in the logs (ideally on debug)?*
>
>
> I didn't see anything within the logs that seemed to indicate anything out
> of the ordinary. I'm currently using a MiniClusterResources for this and
> attempted to set the logging levels to pick up everything (i.e. ALL), but
> if there's a way to expose more, I'm not aware of it.
>
> *Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?*
>
>
> With the debugger, I traced one of the counter initializations and it
> seems that no reporters were being found within the register call in the
> MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
>
> if (this.reporters != null) {
>     for(int i = 0; i < this.reporters.size(); ++i) {
>         MetricRegistryImpl.ReporterAndSettings reporterAndSettings = (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>
>         try {
>             if (reporterAndSettings != null) {
>                 FrontMetricGroup front = new FrontMetricGroup(reporterAndSettings.getSettings(), group);
>                 reporterAndSettings.getReporter().notifyOfAddedMetric(metric, metricName, front);
>             }
>         } catch (Exception var11) {
>             LOG.warn("Error while registering metric: {}.", metricName, var11);
>         }
>     }
> }
>
>  Perhaps this is an error on my part as I had assumed the following would
> be sufficient to register my reporter (within a local / minicluster
> environment):
>
> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>     ConfigConstants.METRICS_REPORTER_PREFIX +
>     "MockCustomMetricsReporter." +
>     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name))
> @ClassRule@JvmFieldval flink = MiniClusterResource(
>     MiniClusterResourceConfiguration.Builder()
>         .setConfiguration(metricsConfiguration)
>         .setNumberTaskManagers(1)
>         .setNumberSlotsPerTaskManager(1)
>         .build()
> )
>
> However, it's clearly being recognized for the built-in metrics, just not
> these custom ones that are being registered as they are triggering the
> notifyOfAddedMetric() function within the reporter itself.
>
> *Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?*
>
>
> It looks like there are metrics coming from both the JobManager and
> TaskManagers from the following examples that were coming out:
>
> localhost.jobmanager.numRegisteredTaskManagers
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
> localhost.jobmanager.Status.JVM.Memory.Direct.Count
>
> I do agree that a factory implementation with a static reporter would
> likely be a better approach, so I may explore that a bit more. As well as
> adding some changes to the existing, albeit ghetto, implementation for
> handling the dynamic metrics. I did see several references to a
> MetricRegistry class, however I wasn't sure if that was the most
> appropriate place to add this type of functionality or if it was needed at
> all.
>
> Thanks much,
>
> Rion
>
>
>
> On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> Was there anything in the logs (ideally on debug)?
>> Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?
>> Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?
>>
>> I can see several issues with your code, but none that would fully
>> explain the issue:
>>
>> a) your reporter is not thread-safe
>> b) you only differentiate metrics by name, which will lead to quite a few
>> collisions.
>>
>> Be also aware that there will be 2 reporter instances; one for the JM and
>> one for the TM.
>> To remedy this, I would recommend creating a factory that returns a
>> static reporter instance instead; overall this tends to be cleaner.
>>
>> Alternatively, when using the testing harnesses IIRC you can also set set
>> a custom MetricGroup implementation.
>>
>> On 3/16/2021 4:13 AM, Rion Williams wrote:
>>
>> Hi all,
>>
>> Recently, I was working on adding some custom metrics to a Flink job that
>> required the use of dynamic labels (i.e. capturing various counters that
>> were "slicable" by things like tenant / source, etc.).
>>
>> I ended up handling it in a very naive fashion that would just keep a
>> dictionary of metrics that had already been registered and update them
>> accordingly which looked something like this:
>>
>> class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {
>>     private lateinit var metrics: CustomMetricsRegistry    override fun open(parameters: Configuration) {
>>         metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
>>     }
>>
>>     override fun processElement(event: Event, context: Context, collector: Collector<Unit>) {
>>         // Insert calls like metrics.inc("tenant-name", 4) here    }
>> }
>> class CustomMetricsRegistry(private val metricGroup: MetricGroup): Serializable {
>>     // Increments a given metric by key    fun inc(metric: String, tenant: String, amount: Long = 1) {
>>         // Store a key for the metric        val key = "$metric-$tenant"        // Store/register the metric        if (!registeredMetrics.containsKey(key)){
>>             registeredMetrics[key] = metricGroup                .addGroup("tenant", tenant)
>>                 .counter(metric)
>>         }
>>
>>         // Update the metric by a given amount        registeredMetrics[key]!!.inc(amount)
>>     }
>>
>>     companion object {
>>         private var registeredMetrics: HashMap<String, Counter> = hashMapOf()
>>     }
>> }
>>
>> Basically registering and updating new metrics for tenants as they are
>> encountered, which I've seen being emitted as expected via hitting the
>> appropriately configured metrics endpoint (using a PrometheusReporter).
>>
>> However, while I was trying to write a few unit tests for this, I seemed
>> to encounter an issue. I was following a Stack Overflow post that was
>> answered by @Chesnay Schepler <ch...@apache.org> [0] that described
>> the use of an in-memory/embedded Flink cluster and a custom reporter that
>> would statically expose the underlying metrics.
>>
>> So I took a shot at implementing something similar as follows:
>>
>> *Flink Cluster Definition*
>>
>> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>>     ConfigConstants.METRICS_REPORTER_PREFIX +
>>     "MockCustomMetricsReporter." +
>>     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name))
>> @ClassRule@JvmFieldval flinkCluster = MiniClusterResource(
>>     MiniClusterResourceConfiguration.Builder()
>>         .setConfiguration(metricsConfiguration)
>>         .setNumberTaskManagers(1)
>>         .setNumberSlotsPerTaskManager(1)
>>         .build()
>> )
>>
>> *Custom Reporter*
>>
>> class MockCustomMetricsReporter : MetricReporter {
>>
>>     override fun open(metricConfig: MetricConfig) {}
>>     override fun close() {}
>>     override fun notifyOfAddedMetric(metric: Metric, name: String, metricGroup: MetricGroup) {
>>         // Store the metrics that are being registered as we see them        if (!registeredCustomMetrics.containsKey(name)){
>>             registeredCustomMetrics[name] = metric        }
>>     }
>>
>>     override fun notifyOfRemovedMetric(metric: Metric, name: String, metricGroup: MetricGroup) {
>>         // Do nothing here    }
>>
>>     companion object {
>>         // Static reference to metrics as they are registered        var registeredCustomMetrics = HashMap<String, Metric>()
>>     }
>> }
>>
>> *Example Test*
>>
>> @Testfun `Example Metrics Use Case`(){
>>     // Arrange    val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>     val events = listOf(
>>         eventWithUsers("tenant1", "user1@testing.com"),
>>         eventWithUsers("tenant2", "user2@testing.com"),
>>     )
>>
>>     // Act    stream
>>         .fromCollection(events)
>>         .process(MyCustomProcessFunction())
>>
>>     // Assert    stream.execute()
>>     assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
>> }
>>
>> While this test will pass, *the problem is that the custom metrics
>> defined dynamically (via the CustomMetricsRegistry implementation) do not
>> appear within the registeredCustomMetrics collection*. In fact, there
>> are 21 metrics that get registered but all of them appear to be classic
>> out-of-the-box metrics such as CPU usage, number of task managers, load,
>> various other Netty and JVM stats, but no custom metrics are included.
>>
>> I've tried multiple different configurations, implementations via a
>> custom TestHarness, etc. but for some reason the custom metrics being
>> defined are never triggering the notifyOfAddedMetric function which
>> would be responsible for adding them to the static collection to be
>> asserted against.
>>
>> Any ideas / guidance would be more than welcome. Perhaps a different
>> approach? Based off examples I've encountered, the code seems like it
>> should "just work".
>>
>> Thanks much,
>>
>> Rion
>>
>> [0] :
>> https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>>
>>
>>
>>
>

Re: Unit Testing for Custom Metrics in Flink

Posted by Chesnay Schepler <ch...@apache.org>.
Are you actually running a job, or are you using a harness for testing 
your function?

On 3/16/2021 3:24 PM, Rion Williams wrote:
> Hi Chesnay,
>
> Thanks for the prompt response and feedback, it's very much 
> appreciated. Please see the inline responses below to your questions:
>
>     *Was there anything in the logs (ideally on debug)?*
>
>
> I didn't see anything within the logs that seemed to indicate anything 
> out of the ordinary. I'm currently using a MiniClusterResources for 
> this and attempted to set the logging levels to pick up everything 
> (i.e. ALL), but if there's a way to expose more, I'm not aware of it.
>
>     *Have you debugged the execution and followed the counter() calls
>     all the way to the reporter?*
>
>
> With the debugger, I traced one of the counter initializations and it 
> seems that no reporters were being found within the register call in 
> the MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
> if (this.reporters !=null) {
>      for(int i =0; i <this.reporters.size(); ++i) {
>          MetricRegistryImpl.ReporterAndSettings reporterAndSettings = (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>
>          try {
>              if (reporterAndSettings !=null) {
>                  FrontMetricGroup front =new FrontMetricGroup(reporterAndSettings.getSettings(),group);
>                  reporterAndSettings.getReporter().notifyOfAddedMetric(metric,metricName, front);
>              }
>          }catch (Exception var11) {
>              LOG.warn("Error while registering metric: {}.",metricName,var11);
>          }
>      }
> }
>  Perhaps this is an error on my part as I had assumed the following 
> would be sufficient to register my reporter (within a local / 
> minicluster environment):
> private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
>      ConfigConstants.METRICS_REPORTER_PREFIX +
>      "MockCustomMetricsReporter." +
>      ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name ))
>
> @ClassRule @JvmField val flink =MiniClusterResource(
>      MiniClusterResourceConfiguration.Builder()
>          .setConfiguration(metricsConfiguration)
>          .setNumberTaskManagers(1)
>          .setNumberSlotsPerTaskManager(1)
>          .build()
> )
> However, it's clearly being recognized for the built-in metrics, just 
> not these custom ones that are being registered as they are triggering 
> the notifyOfAddedMetric() function within the reporter itself.
>
>     *Do you only see JobManager metrics, or is there somewhere also
>     something about the TaskManager?*
>
>
> It looks like there are metrics coming from both the JobManager and 
> TaskManagers from the following examples that were coming out:
> localhost.jobmanager.numRegisteredTaskManagers
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
> localhost.jobmanager.Status.JVM.Memory.Direct.Count
> I do agree that a factory implementation with a static reporter would 
> likely be a better approach, so I may explore that a bit more. As well 
> as adding some changes to the existing, albeit ghetto, implementation 
> for handling the dynamic metrics. I did see several references to a 
> MetricRegistry class, however I wasn't sure if that was the most 
> appropriate place to add this type of functionality or if it was 
> needed at all.
>
> Thanks much,
>
> Rion
>
>
>
> On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     Was there anything in the logs (ideally on debug)?
>     Have you debugged the execution and followed the counter() calls
>     all the way to the reporter?
>     Do you only see JobManager metrics, or is there somewhere also
>     something about the TaskManager?
>
>     I can see several issues with your code, but none that would fully
>     explain the issue:
>
>     a) your reporter is not thread-safe
>     b) you only differentiate metrics by name, which will lead to
>     quite a few collisions.
>
>     Be also aware that there will be 2 reporter instances; one for the
>     JM and one for the TM.
>     To remedy this, I would recommend creating a factory that returns
>     a static reporter instance instead; overall this tends to be cleaner.
>
>     Alternatively, when using the testing harnesses IIRC you can also
>     set set a custom MetricGroup implementation.
>
>     On 3/16/2021 4:13 AM, Rion Williams wrote:
>>     Hi all,
>>
>>     Recently, I was working on adding some custom metrics to a Flink
>>     job that required the use of dynamic labels (i.e. capturing
>>     various counters that were "slicable" by things like tenant /
>>     source, etc.).
>>
>>     I ended up handling it in a very naive fashion that would just
>>     keep a dictionary of metrics that had already been registered and
>>     update them accordingly which looked something like this:
>>     class MyCustomProcessFunction:ProcessFunction<Event,Unit>() {
>>          private lateinit var metrics:CustomMetricsRegistry override fun open(parameters:Configuration) {
>>              metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
>>          }
>>
>>          override fun processElement(event:Event,context:Context,collector:Collector<Unit>) {
>>              // Insert calls like metrics.inc("tenant-name", 4) here }
>>     }
>>
>>     class CustomMetricsRegistry(private val metricGroup:MetricGroup):Serializable {
>>          // Increments a given metric by key fun inc(metric:String,tenant:String,amount:Long =1) {
>>              // Store a key for the metric val key ="$metric-$tenant" // Store/register the metric if (!registeredMetrics.containsKey(key)){
>>                  registeredMetrics[key] =metricGroup .addGroup("tenant",tenant)
>>                      .counter(metric)
>>              }
>>
>>              // Update the metric by a given amount registeredMetrics[key]!!.inc(amount)
>>          }
>>
>>          companion object {
>>              private var registeredMetrics:HashMap<String,Counter> = hashMapOf()
>>          }
>>     }
>>     Basically registering and updating new metrics for tenants as
>>     they are encountered, which I've seen being emitted as expected
>>     via hitting the appropriately configured metrics endpoint (using
>>     a PrometheusReporter).
>>
>>     However, while I was trying to write a few unit tests for this, I
>>     seemed to encounter an issue. I was following a Stack Overflow
>>     post that was answered by @Chesnay Schepler
>>     <ma...@apache.org> [0] that described the use of an
>>     in-memory/embedded Flink cluster and a custom reporter that would
>>     statically expose the underlying metrics.
>>
>>     So I took a shot at implementing something similar as follows:
>>
>>     *Flink Cluster Definition*
>>     private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
>>          ConfigConstants.METRICS_REPORTER_PREFIX +
>>          "MockCustomMetricsReporter." +
>>          ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name ))
>>
>>     @ClassRule @JvmField val flinkCluster =MiniClusterResource(
>>          MiniClusterResourceConfiguration.Builder()
>>              .setConfiguration(metricsConfiguration)
>>              .setNumberTaskManagers(1)
>>              .setNumberSlotsPerTaskManager(1)
>>              .build()
>>     )
>>     *Custom Reporter*
>>     class MockCustomMetricsReporter :MetricReporter {
>>
>>          override fun open(metricConfig:MetricConfig) {}
>>          override fun close() {}
>>          override fun notifyOfAddedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
>>              // Store the metrics that are being registered as we see them if (!registeredCustomMetrics.containsKey(name)){
>>                  registeredCustomMetrics[name] =metric }
>>          }
>>
>>          override fun notifyOfRemovedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
>>              // Do nothing here }
>>
>>          companion object {
>>              // Static reference to metrics as they are registered var
>>     registeredCustomMetrics =HashMap<String,Metric>()
>>          }
>>     }
>>     *Example Test*
>>     @Test fun `Example Metrics Use Case`(){
>>          // Arrange val stream =StreamExecutionEnvironment.getExecutionEnvironment()
>>          val events =listOf(
>>              eventWithUsers("tenant1","user1@testing.com <ma...@testing.com>"),
>>              eventWithUsers("tenant2","user2@testing.com <ma...@testing.com>"),
>>          )
>>
>>          // Act stream
>>              .fromCollection(events)
>>              .process(MyCustomProcessFunction())
>>
>>          // Assert stream.execute()
>>          assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
>>     }
>>     While this test will pass, *the problem is that the custom
>>     metrics defined dynamically (via the CustomMetricsRegistry
>>     implementation) do not appear within the registeredCustomMetrics
>>     collection*. In fact, there are 21 metrics that get registered
>>     but all of them appear to be classic out-of-the-box metrics such
>>     as CPU usage, number of task managers, load, various other Netty
>>     and JVM stats, but no custom metrics are included.
>>
>>     I've tried multiple different configurations, implementations via
>>     a custom TestHarness, etc. but for some reason the custom metrics
>>     being defined are never triggering the notifyOfAddedMetric
>>     function which would be responsible for adding them to the static
>>     collection to be asserted against.
>>
>>     Any ideas / guidance would be more than welcome. Perhaps a
>>     different approach? Based off examples I've encountered, the code
>>     seems like it should "just work".
>>
>>     Thanks much,
>>
>>     Rion
>>
>>     [0] :
>>     https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>>     <https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink>
>>
>>
>


Re: Unit Testing for Custom Metrics in Flink

Posted by Rion Williams <ri...@gmail.com>.
Hi Chesnay,

Thanks for the prompt response and feedback, it's very much appreciated.
Please see the inline responses below to your questions:

*Was there anything in the logs (ideally on debug)?*


I didn't see anything within the logs that seemed to indicate anything out
of the ordinary. I'm currently using a MiniClusterResources for this and
attempted to set the logging levels to pick up everything (i.e. ALL), but
if there's a way to expose more, I'm not aware of it.

*Have you debugged the execution and followed the counter() calls all the
> way to the reporter?*


With the debugger, I traced one of the counter initializations and it seems
that no reporters were being found within the register call in the
MetricsRegistryImpl (i.e. this.reporters has no registered reporters):

if (this.reporters != null) {
    for(int i = 0; i < this.reporters.size(); ++i) {
        MetricRegistryImpl.ReporterAndSettings reporterAndSettings =
(MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);

        try {
            if (reporterAndSettings != null) {
                FrontMetricGroup front = new
FrontMetricGroup(reporterAndSettings.getSettings(), group);

reporterAndSettings.getReporter().notifyOfAddedMetric(metric,
metricName, front);
            }
        } catch (Exception var11) {
            LOG.warn("Error while registering metric: {}.", metricName, var11);
        }
    }
}

 Perhaps this is an error on my part as I had assumed the following would
be sufficient to register my reporter (within a local / minicluster
environment):

private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
    ConfigConstants.METRICS_REPORTER_PREFIX +
    "MockCustomMetricsReporter." +
    ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
MockCustomMetricsReporter::class.java.name
))

@ClassRule
@JvmField
val flink = MiniClusterResource(
    MiniClusterResourceConfiguration.Builder()
        .setConfiguration(metricsConfiguration)
        .setNumberTaskManagers(1)
        .setNumberSlotsPerTaskManager(1)
        .build()
)

However, it's clearly being recognized for the built-in metrics, just not
these custom ones that are being registered as they are triggering the
notifyOfAddedMetric() function within the reporter itself.

*Do you only see JobManager metrics, or is there somewhere also something
> about the TaskManager?*


It looks like there are metrics coming from both the JobManager and
TaskManagers from the following examples that were coming out:

localhost.jobmanager.numRegisteredTaskManagers
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
localhost.jobmanager.Status.JVM.Memory.Direct.Count

I do agree that a factory implementation with a static reporter would
likely be a better approach, so I may explore that a bit more. As well as
adding some changes to the existing, albeit ghetto, implementation for
handling the dynamic metrics. I did see several references to a
MetricRegistry class, however I wasn't sure if that was the most
appropriate place to add this type of functionality or if it was needed at
all.

Thanks much,

Rion



On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <ch...@apache.org> wrote:

> Was there anything in the logs (ideally on debug)?
> Have you debugged the execution and followed the counter() calls all the
> way to the reporter?
> Do you only see JobManager metrics, or is there somewhere also something
> about the TaskManager?
>
> I can see several issues with your code, but none that would fully explain
> the issue:
>
> a) your reporter is not thread-safe
> b) you only differentiate metrics by name, which will lead to quite a few
> collisions.
>
> Be also aware that there will be 2 reporter instances; one for the JM and
> one for the TM.
> To remedy this, I would recommend creating a factory that returns a static
> reporter instance instead; overall this tends to be cleaner.
>
> Alternatively, when using the testing harnesses IIRC you can also set set
> a custom MetricGroup implementation.
>
> On 3/16/2021 4:13 AM, Rion Williams wrote:
>
> Hi all,
>
> Recently, I was working on adding some custom metrics to a Flink job that
> required the use of dynamic labels (i.e. capturing various counters that
> were "slicable" by things like tenant / source, etc.).
>
> I ended up handling it in a very naive fashion that would just keep a
> dictionary of metrics that had already been registered and update them
> accordingly which looked something like this:
>
> class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {
>     private lateinit var metrics: CustomMetricsRegistry    override fun open(parameters: Configuration) {
>         metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
>     }
>
>     override fun processElement(event: Event, context: Context, collector: Collector<Unit>) {
>         // Insert calls like metrics.inc("tenant-name", 4) here    }
> }
> class CustomMetricsRegistry(private val metricGroup: MetricGroup): Serializable {
>     // Increments a given metric by key    fun inc(metric: String, tenant: String, amount: Long = 1) {
>         // Store a key for the metric        val key = "$metric-$tenant"        // Store/register the metric        if (!registeredMetrics.containsKey(key)){
>             registeredMetrics[key] = metricGroup                .addGroup("tenant", tenant)
>                 .counter(metric)
>         }
>
>         // Update the metric by a given amount        registeredMetrics[key]!!.inc(amount)
>     }
>
>     companion object {
>         private var registeredMetrics: HashMap<String, Counter> = hashMapOf()
>     }
> }
>
> Basically registering and updating new metrics for tenants as they are
> encountered, which I've seen being emitted as expected via hitting the
> appropriately configured metrics endpoint (using a PrometheusReporter).
>
> However, while I was trying to write a few unit tests for this, I seemed
> to encounter an issue. I was following a Stack Overflow post that was
> answered by @Chesnay Schepler <ch...@apache.org> [0] that described the
> use of an in-memory/embedded Flink cluster and a custom reporter that would
> statically expose the underlying metrics.
>
> So I took a shot at implementing something similar as follows:
>
> *Flink Cluster Definition*
>
> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
>     ConfigConstants.METRICS_REPORTER_PREFIX +
>     "MockCustomMetricsReporter." +
>     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name))
> @ClassRule@JvmFieldval flinkCluster = MiniClusterResource(
>     MiniClusterResourceConfiguration.Builder()
>         .setConfiguration(metricsConfiguration)
>         .setNumberTaskManagers(1)
>         .setNumberSlotsPerTaskManager(1)
>         .build()
> )
>
> *Custom Reporter*
>
> class MockCustomMetricsReporter : MetricReporter {
>
>     override fun open(metricConfig: MetricConfig) {}
>     override fun close() {}
>     override fun notifyOfAddedMetric(metric: Metric, name: String, metricGroup: MetricGroup) {
>         // Store the metrics that are being registered as we see them        if (!registeredCustomMetrics.containsKey(name)){
>             registeredCustomMetrics[name] = metric        }
>     }
>
>     override fun notifyOfRemovedMetric(metric: Metric, name: String, metricGroup: MetricGroup) {
>         // Do nothing here    }
>
>     companion object {
>         // Static reference to metrics as they are registered        var registeredCustomMetrics = HashMap<String, Metric>()
>     }
> }
>
> *Example Test*
>
> @Testfun `Example Metrics Use Case`(){
>     // Arrange    val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>     val events = listOf(
>         eventWithUsers("tenant1", "user1@testing.com"),
>         eventWithUsers("tenant2", "user2@testing.com"),
>     )
>
>     // Act    stream
>         .fromCollection(events)
>         .process(MyCustomProcessFunction())
>
>     // Assert    stream.execute()
>     assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
> }
>
> While this test will pass, *the problem is that the custom metrics
> defined dynamically (via the CustomMetricsRegistry implementation) do not
> appear within the registeredCustomMetrics collection*. In fact, there are
> 21 metrics that get registered but all of them appear to be classic
> out-of-the-box metrics such as CPU usage, number of task managers, load,
> various other Netty and JVM stats, but no custom metrics are included.
>
> I've tried multiple different configurations, implementations via a custom
> TestHarness, etc. but for some reason the custom metrics being defined are
> never triggering the notifyOfAddedMetric function which would be
> responsible for adding them to the static collection to be asserted
> against.
>
> Any ideas / guidance would be more than welcome. Perhaps a different
> approach? Based off examples I've encountered, the code seems like it
> should "just work".
>
> Thanks much,
>
> Rion
>
> [0] :
> https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
>
>
>
>

Re: Unit Testing for Custom Metrics in Flink

Posted by Chesnay Schepler <ch...@apache.org>.
Was there anything in the logs (ideally on debug)?
Have you debugged the execution and followed the counter() calls all the 
way to the reporter?
Do you only see JobManager metrics, or is there somewhere also something 
about the TaskManager?

I can see several issues with your code, but none that would fully 
explain the issue:

a) your reporter is not thread-safe
b) you only differentiate metrics by name, which will lead to quite a 
few collisions.

Be also aware that there will be 2 reporter instances; one for the JM 
and one for the TM.
To remedy this, I would recommend creating a factory that returns a 
static reporter instance instead; overall this tends to be cleaner.

Alternatively, when using the testing harnesses IIRC you can also set 
set a custom MetricGroup implementation.

On 3/16/2021 4:13 AM, Rion Williams wrote:
> Hi all,
>
> Recently, I was working on adding some custom metrics to a Flink job 
> that required the use of dynamic labels (i.e. capturing various 
> counters that were "slicable" by things like tenant / source, etc.).
>
> I ended up handling it in a very naive fashion that would just keep a 
> dictionary of metrics that had already been registered and update them 
> accordingly which looked something like this:
> class MyCustomProcessFunction:ProcessFunction<Event,Unit>() {
>      private lateinit var metrics:CustomMetricsRegistry override fun open(parameters:Configuration) {
>          metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
>      }
>
>      override fun processElement(event:Event,context:Context,collector:Collector<Unit>) {
>          // Insert calls like metrics.inc("tenant-name", 4) here }
> }
>
> class CustomMetricsRegistry(private val metricGroup:MetricGroup):Serializable {
>      // Increments a given metric by key fun inc(metric:String,tenant:String,amount:Long =1) {
>          // Store a key for the metric val key ="$metric-$tenant" // Store/register the metric if (!registeredMetrics.containsKey(key)){
>              registeredMetrics[key] =metricGroup .addGroup("tenant",tenant)
>                  .counter(metric)
>          }
>
>          // Update the metric by a given amount registeredMetrics[key]!!.inc(amount)
>      }
>
>      companion object {
>          private var registeredMetrics:HashMap<String,Counter> = hashMapOf()
>      }
> }
> Basically registering and updating new metrics for tenants as they are 
> encountered, which I've seen being emitted as expected via hitting the 
> appropriately configured metrics endpoint (using a PrometheusReporter).
>
> However, while I was trying to write a few unit tests for this, I 
> seemed to encounter an issue. I was following a Stack Overflow post 
> that was answered by @Chesnay Schepler <ma...@apache.org> [0] 
> that described the use of an in-memory/embedded Flink cluster and a 
> custom reporter that would statically expose the underlying metrics.
>
> So I took a shot at implementing something similar as follows:
>
> *Flink Cluster Definition*
> private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
>      ConfigConstants.METRICS_REPORTER_PREFIX +
>      "MockCustomMetricsReporter." +
>      ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to MockCustomMetricsReporter::class.java.name ))
>
> @ClassRule @JvmField val flinkCluster =MiniClusterResource(
>      MiniClusterResourceConfiguration.Builder()
>          .setConfiguration(metricsConfiguration)
>          .setNumberTaskManagers(1)
>          .setNumberSlotsPerTaskManager(1)
>          .build()
> )
> *Custom Reporter*
> class MockCustomMetricsReporter :MetricReporter {
>
>      override fun open(metricConfig:MetricConfig) {}
>      override fun close() {}
>      override fun notifyOfAddedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
>          // Store the metrics that are being registered as we see them if (!registeredCustomMetrics.containsKey(name)){
>              registeredCustomMetrics[name] =metric }
>      }
>
>      override fun notifyOfRemovedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
>          // Do nothing here }
>
>      companion object {
>          // Static reference to metrics as they are registered var 
> registeredCustomMetrics =HashMap<String,Metric>()
>      }
> }
> *Example Test*
> @Test fun `Example Metrics Use Case`(){
>      // Arrange val stream =StreamExecutionEnvironment.getExecutionEnvironment()
>      val events =listOf(
>          eventWithUsers("tenant1","user1@testing.com <ma...@testing.com>"),
>          eventWithUsers("tenant2","user2@testing.com <ma...@testing.com>"),
>      )
>
>      // Act stream
>          .fromCollection(events)
>          .process(MyCustomProcessFunction())
>
>      // Assert stream.execute()
>      assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
> }
> While this test will pass, *the problem is that the custom metrics 
> defined dynamically (via the CustomMetricsRegistry implementation) do 
> not appear within the registeredCustomMetrics collection*. In fact, 
> there are 21 metrics that get registered but all of them appear to be 
> classic out-of-the-box metrics such as CPU usage, number of task 
> managers, load, various other Netty and JVM stats, but no custom 
> metrics are included.
>
> I've tried multiple different configurations, implementations via a 
> custom TestHarness, etc. but for some reason the custom metrics being 
> defined are never triggering the notifyOfAddedMetric function which 
> would be responsible for adding them to the static collection to be 
> asserted against.
>
> Any ideas / guidance would be more than welcome. Perhaps a different 
> approach? Based off examples I've encountered, the code seems like it 
> should "just work".
>
> Thanks much,
>
> Rion
>
> [0] : 
> https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink 
> <https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink>
>
>