You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ne...@apache.org on 2020/04/24 09:03:27 UTC

[openwhisk] branch master updated: Support Relabel of Prometheus Metrics Tags based on provided config. (#4876)

This is an automated email from the ASF dual-hosted git repository.

neerajmangal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d495993  Support Relabel of Prometheus Metrics Tags based on provided config. (#4876)
d495993 is described below

commit d495993a54d81ccbd5c28f0aa971f254722b1f9d
Author: Neeraj Mangal <ne...@users.noreply.github.com>
AuthorDate: Fri Apr 24 14:33:10 2020 +0530

    Support Relabel of Prometheus Metrics Tags based on provided config. (#4876)
    
    * Relabel Prometheus Metrics Tags based on config provided
    
    * Remove commented code
    
    * Fix Tag order and refactoring
    
    * Fix Tests
    
    * actionStatus->userDefinedStatusCode
    
    * Fix Tests and Add Test for rename-tags config
    
    * Review Comments and README updates
    
    Co-authored-by: Neeraj Mangal <ma...@adobe.com>
---
 core/monitoring/user-events/README.md              |  13 ++
 .../user-events/src/main/resources/reference.conf  |   5 +
 .../core/monitoring/metrics/OpenWhiskEvents.scala  |   7 +-
 .../monitoring/metrics/PrometheusRecorder.scala    | 215 ++++++++++-----------
 .../src/test/resources/application.conf            |   3 +
 .../core/monitoring/metrics/ApiTests.scala         |  10 +-
 .../core/monitoring/metrics/EventsTestHelper.scala |   5 +-
 .../monitoring/metrics/OpenWhiskEventsTests.scala  |   6 +-
 .../metrics/PrometheusRecorderTests.scala          |  44 ++++-
 9 files changed, 186 insertions(+), 122 deletions(-)

diff --git a/core/monitoring/user-events/README.md b/core/monitoring/user-events/README.md
index c21475e..39222b8 100644
--- a/core/monitoring/user-events/README.md
+++ b/core/monitoring/user-events/README.md
@@ -45,6 +45,19 @@ whisk {
   }
 }
 ```
+- To rename metrics tags, use the below configuration. Currently, this configuration only applies to the Prometheus
+Metrics. For example, here `namespace` tag name will be replaced by `ow_namespace` in all metrics.
+
+```
+whisk {
+  user-events {
+    rename-tags {
+      # rename/relabel prometheus metrics tags
+      "namespace" = "ow_namespae"
+     }
+  }
+}
+```
 
 Integrations
 ------------
diff --git a/core/monitoring/user-events/src/main/resources/reference.conf b/core/monitoring/user-events/src/main/resources/reference.conf
index 6282614..1491e4a 100644
--- a/core/monitoring/user-events/src/main/resources/reference.conf
+++ b/core/monitoring/user-events/src/main/resources/reference.conf
@@ -26,5 +26,10 @@ whisk {
 
     # Namespaces that should not be monitored
     ignored-namespaces = []
+
+    rename-tags {
+      # rename/relabel prometheus metrics tags
+      # "namespace" = "ow_namespae"
+    }
   }
 }
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
index c4e14b6..03f1f3e 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
@@ -33,7 +33,10 @@ import scala.concurrent.{ExecutionContext, Future}
 
 object OpenWhiskEvents extends SLF4JLogging {
 
-  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String])
+  case class MetricConfig(port: Int,
+                          enableKamon: Boolean,
+                          ignoredNamespaces: Set[String],
+                          renameTags: Map[String, String])
 
   def start(config: Config)(implicit system: ActorSystem,
                             materializer: ActorMaterializer): Future[Http.ServerBinding] = {
@@ -45,7 +48,7 @@ object OpenWhiskEvents extends SLF4JLogging {
 
     val metricConfig = loadConfigOrThrow[MetricConfig](config, "whisk.user-events")
 
-    val prometheusRecorder = PrometheusRecorder(prometheusReporter)
+    val prometheusRecorder = PrometheusRecorder(prometheusReporter, metricConfig)
     val recorders = if (metricConfig.enableKamon) Seq(prometheusRecorder, KamonRecorder) else Seq(prometheusRecorder)
     val eventConsumer = EventConsumer(eventConsumerSettings(defaultConsumerConfig(config)), recorders, metricConfig)
 
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
index c498c6f..baf9188 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
@@ -25,12 +25,12 @@ import akka.event.slf4j.SLF4JLogging
 import akka.http.scaladsl.model.{HttpEntity, MessageEntity}
 import akka.stream.scaladsl.{Concat, Source}
 import akka.util.ByteString
-import org.apache.openwhisk.core.connector.{Activation, Metric}
 import io.prometheus.client.exporter.common.TextFormat
 import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram}
 import kamon.prometheus.PrometheusReporter
-import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
+import org.apache.openwhisk.core.connector.{Activation, Metric}
 import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ActivationResponse}
+import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
 
 import scala.collection.JavaConverters._
 import scala.collection.concurrent.TrieMap
@@ -52,13 +52,13 @@ trait PrometheusMetricNames extends MetricNames {
   val timedLimitMetric = "openwhisk_action_limit_timed_total"
 }
 
-case class PrometheusRecorder(kamon: PrometheusReporter)
+case class PrometheusRecorder(kamon: PrometheusReporter, config: MetricConfig)
     extends MetricRecorder
     with PrometheusExporter
     with SLF4JLogging {
-  import PrometheusRecorder._
   private val activationMetrics = new TrieMap[String, ActivationPromMetrics]
   private val limitMetrics = new TrieMap[String, LimitPromMetrics]
+  private val promMetrics = PrometheusMetrics()
 
   override def processActivation(activation: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
     lookup(activation, initiator).record(activation, initiator, metricConfig)
@@ -85,8 +85,8 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
   }
 
   case class LimitPromMetrics(namespace: String) {
-    private val concurrentLimit = concurrentLimitCounter.labels(namespace)
-    private val timedLimit = timedLimitCounter.labels(namespace)
+    private val concurrentLimit = promMetrics.concurrentLimitCounter.labels(namespace)
+    private val timedLimit = promMetrics.timedLimitCounter.labels(namespace)
 
     def record(m: Metric): Unit = {
       m.metricName match {
@@ -103,24 +103,25 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
                                    kind: String,
                                    memory: String,
                                    initiatorNamespace: String) {
-    private val namespaceActivations = namespaceActivationCounter.labels(namespace, initiatorNamespace)
-    private val activations = activationCounter.labels(namespace, initiatorNamespace, action, kind, memory)
-    private val coldStarts = coldStartCounter.labels(namespace, initiatorNamespace, action)
-    private val waitTime = waitTimeHisto.labels(namespace, initiatorNamespace, action)
-    private val initTime = initTimeHisto.labels(namespace, initiatorNamespace, action)
-    private val duration = durationHisto.labels(namespace, initiatorNamespace, action)
-    private val responseSize = responseSizeHisto.labels(namespace, initiatorNamespace, action)
 
-    private val gauge = memoryGauge.labels(namespace, initiatorNamespace, action)
+    private val namespaceActivations = promMetrics.namespaceActivationCounter.labels(namespace, initiatorNamespace)
+    private val activations = promMetrics.activationCounter.labels(namespace, initiatorNamespace, action, kind, memory)
+    private val coldStarts = promMetrics.coldStartCounter.labels(namespace, initiatorNamespace, action)
+    private val waitTime = promMetrics.waitTimeHisto.labels(namespace, initiatorNamespace, action)
+    private val initTime = promMetrics.initTimeHisto.labels(namespace, initiatorNamespace, action)
+    private val duration = promMetrics.durationHisto.labels(namespace, initiatorNamespace, action)
+    private val responseSize = promMetrics.responseSizeHisto.labels(namespace, initiatorNamespace, action)
+
+    private val gauge = promMetrics.memoryGauge.labels(namespace, initiatorNamespace, action)
 
     private val statusSuccess =
-      statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusSuccess)
+      promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusSuccess)
     private val statusApplicationError =
-      statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusApplicationError)
+      promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusApplicationError)
     private val statusDeveloperError =
-      statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusDeveloperError)
+      promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusDeveloperError)
     private val statusInternalError =
-      statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError)
+      promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError)
 
     def record(a: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
       namespaceActivations.inc()
@@ -150,15 +151,98 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
         case ActivationResponse.statusApplicationError => statusApplicationError.inc()
         case ActivationResponse.statusDeveloperError   => statusDeveloperError.inc()
         case ActivationResponse.statusWhiskError       => statusInternalError.inc()
-        case x                                         => statusCounter.labels(namespace, initiator, action, x).inc()
+        case x                                         => promMetrics.statusCounter.labels(namespace, initiator, action, x).inc()
       }
 
       a.size.foreach(responseSize.observe(_))
       a.userDefinedStatusCode.foreach(value =>
-        userDefinedStatusCodeCounter.labels(namespace, initiator, action, value.toString).inc())
+        promMetrics.userDefinedStatusCodeCounter.labels(namespace, initiator, action, value.toString).inc())
     }
   }
 
+  case class PrometheusMetrics() extends PrometheusMetricNames {
+
+    private val namespace = config.renameTags.getOrElse(actionNamespace, actionNamespace)
+    private val initiator = config.renameTags.getOrElse(initiatorNamespace, initiatorNamespace)
+    private val action = config.renameTags.getOrElse(actionName, actionName)
+    private val kind = config.renameTags.getOrElse(actionKind, actionKind)
+    private val memory = config.renameTags.getOrElse(actionMemory, actionMemory)
+    private val status = config.renameTags.getOrElse(actionStatus, actionStatus)
+    private val statusCode = config.renameTags.getOrElse(userDefinedStatusCode, userDefinedStatusCode)
+
+    val namespaceActivationCounter =
+      counter(namespaceMetric, "Namespace activations Count", namespace, initiator)
+
+    val activationCounter =
+      counter(activationMetric, "Activation Count", namespace, initiator, action, kind, memory)
+
+    val coldStartCounter =
+      counter(coldStartMetric, "Cold start counts", namespace, initiator, action)
+
+    val statusCounter =
+      counter(statusMetric, "Activation failure status type", namespace, initiator, action, status)
+
+    val userDefinedStatusCodeCounter =
+      counter(
+        userDefinedStatusCodeMetric,
+        "status code returned in action result response set by developer",
+        namespace,
+        initiator,
+        action,
+        statusCode)
+
+    val waitTimeHisto =
+      histogram(waitTimeMetric, "Internal system hold time", namespace, initiator, action)
+
+    val initTimeHisto =
+      histogram(initTimeMetric, "Time it took to initialize an action, e.g. docker init", namespace, initiator, action)
+
+    val durationHisto =
+      histogram(durationMetric, "Actual time the action code was running", namespace, initiator, action)
+
+    val responseSizeHisto =
+      Histogram
+        .build()
+        .name(responseSizeMetric)
+        .help("Activation Response size")
+        .labelNames(namespace, initiator, action)
+        .linearBuckets(0, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes.toDouble, 10)
+        .register()
+
+    val memoryGauge =
+      gauge(memoryMetric, "Memory consumption of the action containers", namespace, initiator, action)
+
+    val concurrentLimitCounter =
+      counter(concurrentLimitMetric, "a user has exceeded its limit for concurrent invocations", namespace)
+
+    val timedLimitCounter =
+      counter(timedLimitMetric, "the user has reached its per minute limit for the number of invocations", namespace)
+
+    private def counter(name: String, help: String, tags: String*) =
+      Counter
+        .build()
+        .name(name)
+        .help(help)
+        .labelNames(tags: _*)
+        .register()
+
+    private def gauge(name: String, help: String, tags: String*) =
+      Gauge
+        .build()
+        .name(name)
+        .help(help)
+        .labelNames(tags: _*)
+        .register()
+
+    private def histogram(name: String, help: String, tags: String*) =
+      Histogram
+        .build()
+        .name(name)
+        .help(help)
+        .labelNames(tags: _*)
+        .register()
+  }
+
   //Returns a floating point number
   private def seconds(time: Duration): Double = time.toUnit(TimeUnit.SECONDS)
 
@@ -190,94 +274,3 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
     }
   }
 }
-
-object PrometheusRecorder extends PrometheusMetricNames {
-  private val namespaceActivationCounter =
-    counter(namespaceMetric, "Namespace activations Count", actionNamespace, initiatorNamespace)
-  private val activationCounter =
-    counter(
-      activationMetric,
-      "Activation Count",
-      actionNamespace,
-      initiatorNamespace,
-      actionName,
-      actionKind,
-      actionMemory)
-  private val coldStartCounter =
-    counter(coldStartMetric, "Cold start counts", actionNamespace, initiatorNamespace, actionName)
-  private val statusCounter =
-    counter(
-      statusMetric,
-      "Activation failure status type",
-      actionNamespace,
-      initiatorNamespace,
-      actionName,
-      actionStatus)
-  private val userDefinedStatusCodeCounter =
-    counter(
-      userDefinedStatusCodeMetric,
-      "status code returned in action result response set by developer",
-      actionNamespace,
-      initiatorNamespace,
-      actionName,
-      userDefinedStatusCode)
-  private val waitTimeHisto =
-    histogram(waitTimeMetric, "Internal system hold time", actionNamespace, initiatorNamespace, actionName)
-  private val initTimeHisto =
-    histogram(
-      initTimeMetric,
-      "Time it took to initialize an action, e.g. docker init",
-      actionNamespace,
-      initiatorNamespace,
-      actionName)
-  private val durationHisto =
-    histogram(
-      durationMetric,
-      "Actual time the action code was running",
-      actionNamespace,
-      initiatorNamespace,
-      actionName)
-  private val responseSizeHisto =
-    Histogram
-      .build()
-      .name(responseSizeMetric)
-      .help("Activation Response size")
-      .labelNames(actionNamespace, initiatorNamespace, actionName)
-      .linearBuckets(0, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes.toDouble, 10)
-      .register()
-  private val memoryGauge =
-    gauge(memoryMetric, "Memory consumption of the action containers", actionNamespace, initiatorNamespace, actionName)
-
-  private val concurrentLimitCounter =
-    counter(concurrentLimitMetric, "a user has exceeded its limit for concurrent invocations", actionNamespace)
-
-  private val timedLimitCounter =
-    counter(
-      timedLimitMetric,
-      "the user has reached its per minute limit for the number of invocations",
-      actionNamespace)
-
-  private def counter(name: String, help: String, tags: String*) =
-    Counter
-      .build()
-      .name(name)
-      .help(help)
-      .labelNames(tags: _*)
-      .register()
-
-  private def gauge(name: String, help: String, tags: String*) =
-    Gauge
-      .build()
-      .name(name)
-      .help(help)
-      .labelNames(tags: _*)
-      .register()
-
-  private def histogram(name: String, help: String, tags: String*) =
-    Histogram
-      .build()
-      .name(name)
-      .help(help)
-      .labelNames(tags: _*)
-      .register()
-}
diff --git a/core/monitoring/user-events/src/test/resources/application.conf b/core/monitoring/user-events/src/test/resources/application.conf
index f7413dc..2c7f65f 100644
--- a/core/monitoring/user-events/src/test/resources/application.conf
+++ b/core/monitoring/user-events/src/test/resources/application.conf
@@ -26,4 +26,7 @@ user-events {
   # Namespaces that should not be monitored
   ignored-namespaces = ["guest"]
 
+  rename-tags {
+    #namespace = "ow_namespace"
+  }
 }
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
index 21d5d14..1a75942 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
@@ -21,11 +21,16 @@ import akka.http.scaladsl.model.headers.HttpEncodings._
 import akka.http.scaladsl.model.headers.{`Accept-Encoding`, `Content-Encoding`, HttpEncoding, HttpEncodings}
 import akka.http.scaladsl.model.{HttpCharsets, HttpEntity, HttpResponse}
 import akka.http.scaladsl.testkit.ScalatestRouteTest
+import kamon.prometheus.PrometheusReporter
+import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
 import org.junit.runner.RunWith
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.matchers.Matcher
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import pureconfig.loadConfigOrThrow
+import io.prometheus.client.CollectorRegistry
+import pureconfig.generic.auto._
 
 import scala.concurrent.duration.DurationInt
 
@@ -44,7 +49,10 @@ class ApiTests
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
-    consumer = createConsumer(56754, system.settings.config)
+    CollectorRegistry.defaultRegistry.clear()
+    val metricConfig = loadConfigOrThrow[MetricConfig](system.settings.config, "user-events")
+    val mericRecorder = PrometheusRecorder(new PrometheusReporter, metricConfig)
+    consumer = createConsumer(56754, system.settings.config, mericRecorder)
     api = new PrometheusEventsApi(consumer, createExporter())
   }
 
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
index ffda3f3..1b53e31 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
@@ -22,16 +22,13 @@ import java.net.ServerSocket
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
 import com.typesafe.config.Config
-import kamon.prometheus.PrometheusReporter
 import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
 import pureconfig._
 import pureconfig.generic.auto._
 
 trait EventsTestHelper {
 
-  protected def createConsumer(kport: Int,
-                               globalConfig: Config,
-                               recorder: MetricRecorder = PrometheusRecorder(new PrometheusReporter))(
+  protected def createConsumer(kport: Int, globalConfig: Config, recorder: MetricRecorder)(
     implicit system: ActorSystem,
     materializer: ActorMaterializer) = {
     val settings = OpenWhiskEvents
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
index f8a9a2d..769a040 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
@@ -21,6 +21,7 @@ import akka.http.scaladsl.Http
 import akka.http.scaladsl.model.{HttpRequest, StatusCodes}
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import com.typesafe.config.ConfigFactory
+import io.prometheus.client.CollectorRegistry
 import kamon.Kamon
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -48,10 +49,13 @@ class OpenWhiskEventsTests extends KafkaSpecBase {
            | whisk {
            |  user-events {
            |    port = $httpPort
+           |    rename-tags {
+           |      namespace = "ow_namespace"
+           |    }
            |  }
            | }
          """.stripMargin).withFallback(globalConfig)
-
+    CollectorRegistry.defaultRegistry.clear()
     val binding = OpenWhiskEvents.start(config).futureValue
     val res = get("localhost", httpPort, "/ping")
     res shouldBe Some(StatusCodes.OK, "pong")
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
index 4d75995..3e1e641 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
@@ -17,12 +17,17 @@
 
 package org.apache.openwhisk.core.monitoring.metrics
 
+import com.typesafe.config.ConfigFactory
 import io.prometheus.client.CollectorRegistry
+import kamon.prometheus.PrometheusReporter
 import org.apache.openwhisk.core.connector.{Activation, EventMessage}
 import org.apache.openwhisk.core.entity.{ActivationId, ActivationResponse, Subject, UUID}
+import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.junit.JUnitRunner
+import pureconfig._
+import pureconfig.generic.auto._
 
 import scala.concurrent.duration._
 
@@ -36,11 +41,13 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
   val actionWithDefaultPackage = "createApi"
   val kind = "nodejs:10"
   val memory = "256"
+  createCustomTopic(EventConsumer.userEventTopic)
 
   it should "push user events to kamon" in {
-    createCustomTopic(EventConsumer.userEventTopic)
-
-    val consumer = createConsumer(kafkaPort, system.settings.config)
+    CollectorRegistry.defaultRegistry.clear()
+    val metricConfig = loadConfigOrThrow[MetricConfig](system.settings.config, "user-events")
+    val metricRecorder = PrometheusRecorder(new PrometheusReporter, metricConfig)
+    val consumer = createConsumer(kafkaPort, system.settings.config, metricRecorder)
     publishStringMessageToKafka(
       EventConsumer.userEventTopic,
       newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage", kind, memory).serialize)
@@ -80,6 +87,37 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
     namespaceCounterTotal(namespaceMetric, namespaceGuest) shouldBe 1
   }
 
+  it should "push user event to kamon with prometheus metrics tags relabel" in {
+    val httpPort = freePort()
+    val globalConfig = system.settings.config
+    val config = ConfigFactory.parseString(s"""
+            | whisk {
+            |  user-events {
+            |    port = $httpPort
+            |    enable-kamon = false
+            |    ignored-namespaces = ["guest"]
+            |    rename-tags {
+            |      namespace = "ow_namespace"
+            |    }
+            |  }
+            | }
+         """.stripMargin)
+    CollectorRegistry.defaultRegistry.clear()
+    val metricConfig = loadConfigOrThrow[MetricConfig](config, "whisk.user-events")
+    val metricRecorder = PrometheusRecorder(new PrometheusReporter, metricConfig)
+    val consumer = createConsumer(kafkaPort, system.settings.config, metricRecorder)
+
+    publishStringMessageToKafka(
+      EventConsumer.userEventTopic,
+      newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage", kind, memory).serialize)
+
+    sleep(sleepAfterProduce, "sleeping post produce")
+    consumer.shutdown().futureValue
+    CollectorRegistry.defaultRegistry.getSampleValue(
+      activationMetric,
+      Array("ow_namespace", "initiator", "action", "kind", "memory"),
+      Array(namespaceDemo, initiator, actionWithCustomPackage, kind, memory)) shouldBe 1
+  }
   private def newActivationEvent(actionPath: String, kind: String, memory: String) =
     EventMessage(
       "test",