You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ne...@apache.org on 2020/04/24 09:03:27 UTC
[openwhisk] branch master updated: Support Relabel of Prometheus
Metrics Tags based on provided config. (#4876)
This is an automated email from the ASF dual-hosted git repository.
neerajmangal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new d495993 Support Relabel of Prometheus Metrics Tags based on provided config. (#4876)
d495993 is described below
commit d495993a54d81ccbd5c28f0aa971f254722b1f9d
Author: Neeraj Mangal <ne...@users.noreply.github.com>
AuthorDate: Fri Apr 24 14:33:10 2020 +0530
Support Relabel of Prometheus Metrics Tags based on provided config. (#4876)
* Relabel Prometheus Metrics Tags based on config provided
* Remove commented code
* Fix Tag order and refactoring
* Fix Tests
* actionStatus->userDefinedStatusCode
* Fix Tests and Add Test for rename-tags config
* Review Comments and README updates
Co-authored-by: Neeraj Mangal <ma...@adobe.com>
---
core/monitoring/user-events/README.md | 13 ++
.../user-events/src/main/resources/reference.conf | 5 +
.../core/monitoring/metrics/OpenWhiskEvents.scala | 7 +-
.../monitoring/metrics/PrometheusRecorder.scala | 215 ++++++++++-----------
.../src/test/resources/application.conf | 3 +
.../core/monitoring/metrics/ApiTests.scala | 10 +-
.../core/monitoring/metrics/EventsTestHelper.scala | 5 +-
.../monitoring/metrics/OpenWhiskEventsTests.scala | 6 +-
.../metrics/PrometheusRecorderTests.scala | 44 ++++-
9 files changed, 186 insertions(+), 122 deletions(-)
diff --git a/core/monitoring/user-events/README.md b/core/monitoring/user-events/README.md
index c21475e..39222b8 100644
--- a/core/monitoring/user-events/README.md
+++ b/core/monitoring/user-events/README.md
@@ -45,6 +45,19 @@ whisk {
}
}
```
+- To rename metrics tags, use the below configuration. Currently, this configuration only applies to the Prometheus
+Metrics. For example, here `namespace` tag name will be replaced by `ow_namespace` in all metrics.
+
+```
+whisk {
+ user-events {
+ rename-tags {
+ # rename/relabel prometheus metrics tags
+ "namespace" = "ow_namespae"
+ }
+ }
+}
+```
Integrations
------------
diff --git a/core/monitoring/user-events/src/main/resources/reference.conf b/core/monitoring/user-events/src/main/resources/reference.conf
index 6282614..1491e4a 100644
--- a/core/monitoring/user-events/src/main/resources/reference.conf
+++ b/core/monitoring/user-events/src/main/resources/reference.conf
@@ -26,5 +26,10 @@ whisk {
# Namespaces that should not be monitored
ignored-namespaces = []
+
+ rename-tags {
+ # rename/relabel prometheus metrics tags
+ # "namespace" = "ow_namespae"
+ }
}
}
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
index c4e14b6..03f1f3e 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
@@ -33,7 +33,10 @@ import scala.concurrent.{ExecutionContext, Future}
object OpenWhiskEvents extends SLF4JLogging {
- case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String])
+ case class MetricConfig(port: Int,
+ enableKamon: Boolean,
+ ignoredNamespaces: Set[String],
+ renameTags: Map[String, String])
def start(config: Config)(implicit system: ActorSystem,
materializer: ActorMaterializer): Future[Http.ServerBinding] = {
@@ -45,7 +48,7 @@ object OpenWhiskEvents extends SLF4JLogging {
val metricConfig = loadConfigOrThrow[MetricConfig](config, "whisk.user-events")
- val prometheusRecorder = PrometheusRecorder(prometheusReporter)
+ val prometheusRecorder = PrometheusRecorder(prometheusReporter, metricConfig)
val recorders = if (metricConfig.enableKamon) Seq(prometheusRecorder, KamonRecorder) else Seq(prometheusRecorder)
val eventConsumer = EventConsumer(eventConsumerSettings(defaultConsumerConfig(config)), recorders, metricConfig)
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
index c498c6f..baf9188 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
@@ -25,12 +25,12 @@ import akka.event.slf4j.SLF4JLogging
import akka.http.scaladsl.model.{HttpEntity, MessageEntity}
import akka.stream.scaladsl.{Concat, Source}
import akka.util.ByteString
-import org.apache.openwhisk.core.connector.{Activation, Metric}
import io.prometheus.client.exporter.common.TextFormat
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram}
import kamon.prometheus.PrometheusReporter
-import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
+import org.apache.openwhisk.core.connector.{Activation, Metric}
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ActivationResponse}
+import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
@@ -52,13 +52,13 @@ trait PrometheusMetricNames extends MetricNames {
val timedLimitMetric = "openwhisk_action_limit_timed_total"
}
-case class PrometheusRecorder(kamon: PrometheusReporter)
+case class PrometheusRecorder(kamon: PrometheusReporter, config: MetricConfig)
extends MetricRecorder
with PrometheusExporter
with SLF4JLogging {
- import PrometheusRecorder._
private val activationMetrics = new TrieMap[String, ActivationPromMetrics]
private val limitMetrics = new TrieMap[String, LimitPromMetrics]
+ private val promMetrics = PrometheusMetrics()
override def processActivation(activation: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
lookup(activation, initiator).record(activation, initiator, metricConfig)
@@ -85,8 +85,8 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
}
case class LimitPromMetrics(namespace: String) {
- private val concurrentLimit = concurrentLimitCounter.labels(namespace)
- private val timedLimit = timedLimitCounter.labels(namespace)
+ private val concurrentLimit = promMetrics.concurrentLimitCounter.labels(namespace)
+ private val timedLimit = promMetrics.timedLimitCounter.labels(namespace)
def record(m: Metric): Unit = {
m.metricName match {
@@ -103,24 +103,25 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
kind: String,
memory: String,
initiatorNamespace: String) {
- private val namespaceActivations = namespaceActivationCounter.labels(namespace, initiatorNamespace)
- private val activations = activationCounter.labels(namespace, initiatorNamespace, action, kind, memory)
- private val coldStarts = coldStartCounter.labels(namespace, initiatorNamespace, action)
- private val waitTime = waitTimeHisto.labels(namespace, initiatorNamespace, action)
- private val initTime = initTimeHisto.labels(namespace, initiatorNamespace, action)
- private val duration = durationHisto.labels(namespace, initiatorNamespace, action)
- private val responseSize = responseSizeHisto.labels(namespace, initiatorNamespace, action)
- private val gauge = memoryGauge.labels(namespace, initiatorNamespace, action)
+ private val namespaceActivations = promMetrics.namespaceActivationCounter.labels(namespace, initiatorNamespace)
+ private val activations = promMetrics.activationCounter.labels(namespace, initiatorNamespace, action, kind, memory)
+ private val coldStarts = promMetrics.coldStartCounter.labels(namespace, initiatorNamespace, action)
+ private val waitTime = promMetrics.waitTimeHisto.labels(namespace, initiatorNamespace, action)
+ private val initTime = promMetrics.initTimeHisto.labels(namespace, initiatorNamespace, action)
+ private val duration = promMetrics.durationHisto.labels(namespace, initiatorNamespace, action)
+ private val responseSize = promMetrics.responseSizeHisto.labels(namespace, initiatorNamespace, action)
+
+ private val gauge = promMetrics.memoryGauge.labels(namespace, initiatorNamespace, action)
private val statusSuccess =
- statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusSuccess)
+ promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusSuccess)
private val statusApplicationError =
- statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusApplicationError)
+ promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusApplicationError)
private val statusDeveloperError =
- statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusDeveloperError)
+ promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusDeveloperError)
private val statusInternalError =
- statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError)
+ promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError)
def record(a: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
namespaceActivations.inc()
@@ -150,15 +151,98 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
case ActivationResponse.statusApplicationError => statusApplicationError.inc()
case ActivationResponse.statusDeveloperError => statusDeveloperError.inc()
case ActivationResponse.statusWhiskError => statusInternalError.inc()
- case x => statusCounter.labels(namespace, initiator, action, x).inc()
+ case x => promMetrics.statusCounter.labels(namespace, initiator, action, x).inc()
}
a.size.foreach(responseSize.observe(_))
a.userDefinedStatusCode.foreach(value =>
- userDefinedStatusCodeCounter.labels(namespace, initiator, action, value.toString).inc())
+ promMetrics.userDefinedStatusCodeCounter.labels(namespace, initiator, action, value.toString).inc())
}
}
+ case class PrometheusMetrics() extends PrometheusMetricNames {
+
+ private val namespace = config.renameTags.getOrElse(actionNamespace, actionNamespace)
+ private val initiator = config.renameTags.getOrElse(initiatorNamespace, initiatorNamespace)
+ private val action = config.renameTags.getOrElse(actionName, actionName)
+ private val kind = config.renameTags.getOrElse(actionKind, actionKind)
+ private val memory = config.renameTags.getOrElse(actionMemory, actionMemory)
+ private val status = config.renameTags.getOrElse(actionStatus, actionStatus)
+ private val statusCode = config.renameTags.getOrElse(userDefinedStatusCode, userDefinedStatusCode)
+
+ val namespaceActivationCounter =
+ counter(namespaceMetric, "Namespace activations Count", namespace, initiator)
+
+ val activationCounter =
+ counter(activationMetric, "Activation Count", namespace, initiator, action, kind, memory)
+
+ val coldStartCounter =
+ counter(coldStartMetric, "Cold start counts", namespace, initiator, action)
+
+ val statusCounter =
+ counter(statusMetric, "Activation failure status type", namespace, initiator, action, status)
+
+ val userDefinedStatusCodeCounter =
+ counter(
+ userDefinedStatusCodeMetric,
+ "status code returned in action result response set by developer",
+ namespace,
+ initiator,
+ action,
+ statusCode)
+
+ val waitTimeHisto =
+ histogram(waitTimeMetric, "Internal system hold time", namespace, initiator, action)
+
+ val initTimeHisto =
+ histogram(initTimeMetric, "Time it took to initialize an action, e.g. docker init", namespace, initiator, action)
+
+ val durationHisto =
+ histogram(durationMetric, "Actual time the action code was running", namespace, initiator, action)
+
+ val responseSizeHisto =
+ Histogram
+ .build()
+ .name(responseSizeMetric)
+ .help("Activation Response size")
+ .labelNames(namespace, initiator, action)
+ .linearBuckets(0, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes.toDouble, 10)
+ .register()
+
+ val memoryGauge =
+ gauge(memoryMetric, "Memory consumption of the action containers", namespace, initiator, action)
+
+ val concurrentLimitCounter =
+ counter(concurrentLimitMetric, "a user has exceeded its limit for concurrent invocations", namespace)
+
+ val timedLimitCounter =
+ counter(timedLimitMetric, "the user has reached its per minute limit for the number of invocations", namespace)
+
+ private def counter(name: String, help: String, tags: String*) =
+ Counter
+ .build()
+ .name(name)
+ .help(help)
+ .labelNames(tags: _*)
+ .register()
+
+ private def gauge(name: String, help: String, tags: String*) =
+ Gauge
+ .build()
+ .name(name)
+ .help(help)
+ .labelNames(tags: _*)
+ .register()
+
+ private def histogram(name: String, help: String, tags: String*) =
+ Histogram
+ .build()
+ .name(name)
+ .help(help)
+ .labelNames(tags: _*)
+ .register()
+ }
+
//Returns a floating point number
private def seconds(time: Duration): Double = time.toUnit(TimeUnit.SECONDS)
@@ -190,94 +274,3 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
}
}
}
-
-object PrometheusRecorder extends PrometheusMetricNames {
- private val namespaceActivationCounter =
- counter(namespaceMetric, "Namespace activations Count", actionNamespace, initiatorNamespace)
- private val activationCounter =
- counter(
- activationMetric,
- "Activation Count",
- actionNamespace,
- initiatorNamespace,
- actionName,
- actionKind,
- actionMemory)
- private val coldStartCounter =
- counter(coldStartMetric, "Cold start counts", actionNamespace, initiatorNamespace, actionName)
- private val statusCounter =
- counter(
- statusMetric,
- "Activation failure status type",
- actionNamespace,
- initiatorNamespace,
- actionName,
- actionStatus)
- private val userDefinedStatusCodeCounter =
- counter(
- userDefinedStatusCodeMetric,
- "status code returned in action result response set by developer",
- actionNamespace,
- initiatorNamespace,
- actionName,
- userDefinedStatusCode)
- private val waitTimeHisto =
- histogram(waitTimeMetric, "Internal system hold time", actionNamespace, initiatorNamespace, actionName)
- private val initTimeHisto =
- histogram(
- initTimeMetric,
- "Time it took to initialize an action, e.g. docker init",
- actionNamespace,
- initiatorNamespace,
- actionName)
- private val durationHisto =
- histogram(
- durationMetric,
- "Actual time the action code was running",
- actionNamespace,
- initiatorNamespace,
- actionName)
- private val responseSizeHisto =
- Histogram
- .build()
- .name(responseSizeMetric)
- .help("Activation Response size")
- .labelNames(actionNamespace, initiatorNamespace, actionName)
- .linearBuckets(0, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT.toBytes.toDouble, 10)
- .register()
- private val memoryGauge =
- gauge(memoryMetric, "Memory consumption of the action containers", actionNamespace, initiatorNamespace, actionName)
-
- private val concurrentLimitCounter =
- counter(concurrentLimitMetric, "a user has exceeded its limit for concurrent invocations", actionNamespace)
-
- private val timedLimitCounter =
- counter(
- timedLimitMetric,
- "the user has reached its per minute limit for the number of invocations",
- actionNamespace)
-
- private def counter(name: String, help: String, tags: String*) =
- Counter
- .build()
- .name(name)
- .help(help)
- .labelNames(tags: _*)
- .register()
-
- private def gauge(name: String, help: String, tags: String*) =
- Gauge
- .build()
- .name(name)
- .help(help)
- .labelNames(tags: _*)
- .register()
-
- private def histogram(name: String, help: String, tags: String*) =
- Histogram
- .build()
- .name(name)
- .help(help)
- .labelNames(tags: _*)
- .register()
-}
diff --git a/core/monitoring/user-events/src/test/resources/application.conf b/core/monitoring/user-events/src/test/resources/application.conf
index f7413dc..2c7f65f 100644
--- a/core/monitoring/user-events/src/test/resources/application.conf
+++ b/core/monitoring/user-events/src/test/resources/application.conf
@@ -26,4 +26,7 @@ user-events {
# Namespaces that should not be monitored
ignored-namespaces = ["guest"]
+ rename-tags {
+ #namespace = "ow_namespace"
+ }
}
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
index 21d5d14..1a75942 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
@@ -21,11 +21,16 @@ import akka.http.scaladsl.model.headers.HttpEncodings._
import akka.http.scaladsl.model.headers.{`Accept-Encoding`, `Content-Encoding`, HttpEncoding, HttpEncodings}
import akka.http.scaladsl.model.{HttpCharsets, HttpEntity, HttpResponse}
import akka.http.scaladsl.testkit.ScalatestRouteTest
+import kamon.prometheus.PrometheusReporter
+import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.Matcher
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import pureconfig.loadConfigOrThrow
+import io.prometheus.client.CollectorRegistry
+import pureconfig.generic.auto._
import scala.concurrent.duration.DurationInt
@@ -44,7 +49,10 @@ class ApiTests
override protected def beforeAll(): Unit = {
super.beforeAll()
- consumer = createConsumer(56754, system.settings.config)
+ CollectorRegistry.defaultRegistry.clear()
+ val metricConfig = loadConfigOrThrow[MetricConfig](system.settings.config, "user-events")
+ val mericRecorder = PrometheusRecorder(new PrometheusReporter, metricConfig)
+ consumer = createConsumer(56754, system.settings.config, mericRecorder)
api = new PrometheusEventsApi(consumer, createExporter())
}
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
index ffda3f3..1b53e31 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
@@ -22,16 +22,13 @@ import java.net.ServerSocket
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.Config
-import kamon.prometheus.PrometheusReporter
import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import pureconfig._
import pureconfig.generic.auto._
trait EventsTestHelper {
- protected def createConsumer(kport: Int,
- globalConfig: Config,
- recorder: MetricRecorder = PrometheusRecorder(new PrometheusReporter))(
+ protected def createConsumer(kport: Int, globalConfig: Config, recorder: MetricRecorder)(
implicit system: ActorSystem,
materializer: ActorMaterializer) = {
val settings = OpenWhiskEvents
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
index f8a9a2d..769a040 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
@@ -21,6 +21,7 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, StatusCodes}
import akka.http.scaladsl.unmarshalling.Unmarshal
import com.typesafe.config.ConfigFactory
+import io.prometheus.client.CollectorRegistry
import kamon.Kamon
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -48,10 +49,13 @@ class OpenWhiskEventsTests extends KafkaSpecBase {
| whisk {
| user-events {
| port = $httpPort
+ | rename-tags {
+ | namespace = "ow_namespace"
+ | }
| }
| }
""".stripMargin).withFallback(globalConfig)
-
+ CollectorRegistry.defaultRegistry.clear()
val binding = OpenWhiskEvents.start(config).futureValue
val res = get("localhost", httpPort, "/ping")
res shouldBe Some(StatusCodes.OK, "pong")
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
index 4d75995..3e1e641 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
@@ -17,12 +17,17 @@
package org.apache.openwhisk.core.monitoring.metrics
+import com.typesafe.config.ConfigFactory
import io.prometheus.client.CollectorRegistry
+import kamon.prometheus.PrometheusReporter
import org.apache.openwhisk.core.connector.{Activation, EventMessage}
import org.apache.openwhisk.core.entity.{ActivationId, ActivationResponse, Subject, UUID}
+import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
+import pureconfig._
+import pureconfig.generic.auto._
import scala.concurrent.duration._
@@ -36,11 +41,13 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
val actionWithDefaultPackage = "createApi"
val kind = "nodejs:10"
val memory = "256"
+ createCustomTopic(EventConsumer.userEventTopic)
it should "push user events to kamon" in {
- createCustomTopic(EventConsumer.userEventTopic)
-
- val consumer = createConsumer(kafkaPort, system.settings.config)
+ CollectorRegistry.defaultRegistry.clear()
+ val metricConfig = loadConfigOrThrow[MetricConfig](system.settings.config, "user-events")
+ val metricRecorder = PrometheusRecorder(new PrometheusReporter, metricConfig)
+ val consumer = createConsumer(kafkaPort, system.settings.config, metricRecorder)
publishStringMessageToKafka(
EventConsumer.userEventTopic,
newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage", kind, memory).serialize)
@@ -80,6 +87,37 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
namespaceCounterTotal(namespaceMetric, namespaceGuest) shouldBe 1
}
+ it should "push user event to kamon with prometheus metrics tags relabel" in {
+ val httpPort = freePort()
+ val globalConfig = system.settings.config
+ val config = ConfigFactory.parseString(s"""
+ | whisk {
+ | user-events {
+ | port = $httpPort
+ | enable-kamon = false
+ | ignored-namespaces = ["guest"]
+ | rename-tags {
+ | namespace = "ow_namespace"
+ | }
+ | }
+ | }
+ """.stripMargin)
+ CollectorRegistry.defaultRegistry.clear()
+ val metricConfig = loadConfigOrThrow[MetricConfig](config, "whisk.user-events")
+ val metricRecorder = PrometheusRecorder(new PrometheusReporter, metricConfig)
+ val consumer = createConsumer(kafkaPort, system.settings.config, metricRecorder)
+
+ publishStringMessageToKafka(
+ EventConsumer.userEventTopic,
+ newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage", kind, memory).serialize)
+
+ sleep(sleepAfterProduce, "sleeping post produce")
+ consumer.shutdown().futureValue
+ CollectorRegistry.defaultRegistry.getSampleValue(
+ activationMetric,
+ Array("ow_namespace", "initiator", "action", "kind", "memory"),
+ Array(namespaceDemo, initiator, actionWithCustomPackage, kind, memory)) shouldBe 1
+ }
private def newActivationEvent(actionPath: String, kind: String, memory: String) =
EventMessage(
"test",