You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ne...@apache.org on 2020/07/22 06:34:26 UTC

[openwhisk] branch master updated: activation metrics are recorded for namespace even if set to ignore by user-events service. (#4934)

This is an automated email from the ASF dual-hosted git repository.

neerajmangal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 46f8349  activation metrics are recorded for namespace even if set to ignore by user-events service. (#4934)
46f8349 is described below

commit 46f8349a1abccb0b01fa700c34e47fd4b6c77018
Author: Neeraj Mangal <ne...@users.noreply.github.com>
AuthorDate: Wed Jul 22 12:03:30 2020 +0530

    activation metrics are recorded for namespace even if set to ignore by user-events service. (#4934)
    
    * Ignore namespace still have activation metrics recorded
    
    * Refactoring and Test fixes
    
    * Remove extra 'total' from metric name
    
    Co-authored-by: Neeraj Mangal <ma...@adobe.com>
---
 .../core/monitoring/metrics/EventConsumer.scala    | 17 +++++++++----
 .../core/monitoring/metrics/KamonRecorder.scala    | 15 ++++--------
 .../monitoring/metrics/PrometheusRecorder.scala    | 18 ++++----------
 .../monitoring/metrics/KamonRecorderTests.scala    | 13 ++++------
 .../metrics/PrometheusRecorderTests.scala          | 28 +++++++---------------
 5 files changed, 35 insertions(+), 56 deletions(-)

diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index d890130..2ec5ace 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -30,6 +30,7 @@ import javax.management.ObjectName
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import kamon.Kamon
 import kamon.metric.MeasurementUnit
+import kamon.tag.TagSet
 import org.apache.kafka.common
 import org.apache.kafka.common.MetricName
 import org.apache.openwhisk.connector.kafka.{KafkaMetricsProvider, KamonMetricsReporter}
@@ -41,7 +42,7 @@ import org.apache.openwhisk.core.entity.ActivationResponse
 import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
 
 trait MetricRecorder {
-  def processActivation(activation: Activation, initiatorNamespace: String, metricConfig: MetricConfig): Unit
+  def processActivation(activation: Activation, initiatorNamespace: String): Unit
   def processMetric(metric: Metric, initiatorNamespace: String): Unit
 }
 
@@ -54,6 +55,7 @@ case class EventConsumer(settings: ConsumerSettings[String, String],
   private implicit val ec: ExecutionContext = system.dispatcher
 
   //Record the rate of events received
+  private val activationNamespaceCounter = Kamon.counter("openwhisk.namespace.activations")
   private val activationCounter = Kamon.counter("openwhisk.userevents.global.activations").withoutTags()
   private val metricCounter = Kamon.counter("openwhisk.userevents.global.metric").withoutTags()
 
@@ -122,15 +124,22 @@ case class EventConsumer(settings: ConsumerSettings[String, String],
       .foreach { e =>
         e.body match {
           case a: Activation =>
-            recorders.foreach(_.processActivation(a, e.namespace, metricConfig))
-            updateGlobalMetrics(a)
+            // only record activation if not executed in an ignored namespace
+            if (!metricConfig.ignoredNamespaces.contains(e.namespace)) {
+              recorders.foreach(_.processActivation(a, e.namespace))
+            }
+            updateGlobalMetrics(a, e.namespace)
           case m: Metric =>
             recorders.foreach(_.processMetric(m, e.namespace))
         }
       }
   }
 
-  private def updateGlobalMetrics(a: Activation): Unit = {
+  private def updateGlobalMetrics(a: Activation, e: String): Unit = {
+    val namespaceTag: String = metricConfig.renameTags.getOrElse("namespace", "namespace")
+    val initiatorTag: String = metricConfig.renameTags.getOrElse("initiator", "initiator")
+    val tagSet = TagSet.from(Map(initiatorTag -> e, namespaceTag -> e))
+    activationNamespaceCounter.withTags(tagSet).increment()
     a.status match {
       case ActivationResponse.statusSuccess          => statusSuccess.increment()
       case ActivationResponse.statusApplicationError => statusApplicationError.increment()
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
index 6301349..3ee829e 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
@@ -22,7 +22,6 @@ import org.apache.openwhisk.core.connector.{Activation, Metric}
 import kamon.Kamon
 import kamon.metric.MeasurementUnit
 import kamon.tag.TagSet
-import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
 
 import scala.collection.concurrent.TrieMap
 
@@ -45,8 +44,8 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg
   private val activationMetrics = new TrieMap[String, ActivationKamonMetrics]
   private val limitMetrics = new TrieMap[String, LimitKamonMetrics]
 
-  override def processActivation(activation: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
-    lookup(activation, initiator).record(activation, metricConfig)
+  override def processActivation(activation: Activation, initiator: String): Unit = {
+    lookup(activation, initiator).record(activation)
   }
 
   override def processMetric(metric: Metric, initiator: String): Unit = {
@@ -97,7 +96,6 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg
     private val tags =
       TagSet.from(Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator, `actionName` -> action))
 
-    private val namespaceActivations = Kamon.counter(namespaceActivationMetric).withTags(namespaceActivationsTags)
     private val activations = Kamon.counter(activationMetric).withTags(activationTags)
     private val coldStarts = Kamon.counter(coldStartMetric).withTags(tags)
     private val waitTime = Kamon.histogram(waitTimeMetric, MeasurementUnit.time.milliseconds).withTags(tags)
@@ -106,13 +104,8 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg
     private val responseSize = Kamon.histogram(responseSizeMetric, MeasurementUnit.information.bytes).withTags(tags)
     private val userDefinedStatusCode = Kamon.counter(userDefinedStatusCodeMetric).withTags(tags)
 
-    def record(a: Activation, metricConfig: MetricConfig): Unit = {
-      namespaceActivations.increment()
-
-      // only record activation if not executed in an ignored namespace
-      if (!metricConfig.ignoredNamespaces.contains(a.namespace)) {
-        recordActivation(a)
-      }
+    def record(a: Activation): Unit = {
+      recordActivation(a)
     }
 
     def recordActivation(a: Activation): Unit = {
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
index baf9188..ff7cb9c 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
@@ -37,7 +37,6 @@ import scala.collection.concurrent.TrieMap
 import scala.concurrent.duration.Duration
 
 trait PrometheusMetricNames extends MetricNames {
-  val namespaceMetric = "openwhisk_namespace_activations_total"
   val activationMetric = "openwhisk_action_activations_total"
   val coldStartMetric = "openwhisk_action_coldStarts_total"
   val waitTimeMetric = "openwhisk_action_waitTime_seconds"
@@ -60,8 +59,8 @@ case class PrometheusRecorder(kamon: PrometheusReporter, config: MetricConfig)
   private val limitMetrics = new TrieMap[String, LimitPromMetrics]
   private val promMetrics = PrometheusMetrics()
 
-  override def processActivation(activation: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
-    lookup(activation, initiator).record(activation, initiator, metricConfig)
+  override def processActivation(activation: Activation, initiator: String): Unit = {
+    lookup(activation, initiator).record(activation, initiator)
   }
 
   override def processMetric(metric: Metric, initiator: String): Unit = {
@@ -104,7 +103,6 @@ case class PrometheusRecorder(kamon: PrometheusReporter, config: MetricConfig)
                                    memory: String,
                                    initiatorNamespace: String) {
 
-    private val namespaceActivations = promMetrics.namespaceActivationCounter.labels(namespace, initiatorNamespace)
     private val activations = promMetrics.activationCounter.labels(namespace, initiatorNamespace, action, kind, memory)
     private val coldStarts = promMetrics.coldStartCounter.labels(namespace, initiatorNamespace, action)
     private val waitTime = promMetrics.waitTimeHisto.labels(namespace, initiatorNamespace, action)
@@ -123,13 +121,8 @@ case class PrometheusRecorder(kamon: PrometheusReporter, config: MetricConfig)
     private val statusInternalError =
       promMetrics.statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError)
 
-    def record(a: Activation, initiator: String, metricConfig: MetricConfig): Unit = {
-      namespaceActivations.inc()
-
-      // only record activation if not executed in an ignored namespace
-      if (!metricConfig.ignoredNamespaces.contains(a.namespace)) {
-        recordActivation(a, initiator)
-      }
+    def record(a: Activation, initiator: String): Unit = {
+      recordActivation(a, initiator)
     }
 
     def recordActivation(a: Activation, initiator: String): Unit = {
@@ -170,9 +163,6 @@ case class PrometheusRecorder(kamon: PrometheusReporter, config: MetricConfig)
     private val status = config.renameTags.getOrElse(actionStatus, actionStatus)
     private val statusCode = config.renameTags.getOrElse(userDefinedStatusCode, userDefinedStatusCode)
 
-    val namespaceActivationCounter =
-      counter(namespaceMetric, "Namespace activations Count", namespace, initiator)
-
     val activationCounter =
       counter(activationMetric, "Activation Count", namespace, initiator, action, kind, memory)
 
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
index df75c2b..1c2a277 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
@@ -58,7 +58,6 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
 
   behavior of "KamonConsumer"
 
-  val initiator = "initiatorTest"
   val namespaceDemo = "demo"
   val namespaceGuest = "guest"
   val actionWithCustomPackage = "apimgmt/createApi"
@@ -109,10 +108,8 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
     TestReporter.histogram(durationMetric, namespaceDemo, actionWithDefaultPackage).size shouldBe 1
 
     // Blacklisted namespace should not be tracked
-    TestReporter.counter(activationMetric, namespaceGuest, actionWithDefaultPackage)(0).value shouldBe 0
+    TestReporter.counter(activationMetric, namespaceGuest, actionWithDefaultPackage) shouldBe empty
 
-    // Blacklisted should be counted in "openwhisk.namespace.activations" metric
-    TestReporter.namespaceCounter(namespaceActivationMetric, namespaceGuest)(0).value shouldBe 1
   }
 
   private def newActivationEvent(actionPath: String) =
@@ -130,7 +127,7 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
         memory,
         None),
       Subject("testuser"),
-      initiator,
+      actionPath.split("/")(0),
       UUID("test"),
       Activation.typeName)
 
@@ -154,7 +151,7 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
         .filter(_.name == metricName)
         .flatMap(_.instruments)
         .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
-        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
+        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == namespace)
         .filter(_.tags.get(Lookups.plain(actionName)) == action)
     }
 
@@ -165,7 +162,7 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
         .filter(_.name == metricName)
         .flatMap(_.instruments)
         .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
-        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
+        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == namespace)
     }
 
     def histogram(metricName: String, namespace: String, action: String) = {
@@ -175,7 +172,7 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
         .filter(_.name == metricName)
         .flatMap(_.instruments)
         .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
-        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
+        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == namespace)
         .filter(_.tags.get(Lookups.plain(actionName)) == action)
     }
   }
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
index a5bec19..7394a57 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
@@ -34,7 +34,6 @@ import scala.concurrent.duration._
 @RunWith(classOf[JUnitRunner])
 class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with PrometheusMetricNames {
   behavior of "PrometheusConsumer"
-  val initiator = "initiatorTest"
   val namespaceDemo = "demo"
   val namespaceGuest = "guest"
   val actionWithCustomPackage = "apimgmt/createApiOne"
@@ -81,10 +80,7 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
     counterTotal(activationMetric, namespaceDemo, actionWithDefaultPackage) shouldBe 1
 
     // Blacklisted namespace should not be tracked
-    counterTotal(activationMetric, namespaceGuest, actionWithDefaultPackage) shouldBe 0
-
-    // Blacklisted should be counted in "openwhisk_namespace_activations_total" metric
-    namespaceCounterTotal(namespaceMetric, namespaceGuest) shouldBe 1
+    counterTotal(activationMetric, namespaceGuest, actionWithDefaultPackage) shouldBe (null)
   }
 
   it should "push user event to kamon with prometheus metrics tags relabel" in {
@@ -122,7 +118,7 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
     CollectorRegistry.defaultRegistry.getSampleValue(
       activationMetric,
       Array("ow_namespace", "initiator", "action", "kind", "memory"),
-      Array(namespaceDemo, initiator, actionWithCustomPackage, kind, memory)) shouldBe 1
+      Array(namespaceDemo, namespaceDemo, actionWithCustomPackage, kind, memory)) shouldBe 1
   }
   private def newActivationEvent(actionPath: String, kind: String, memory: String) =
     EventMessage(
@@ -139,7 +135,7 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
         memory.toInt,
         None),
       Subject("testuser"),
-      initiator,
+      actionPath.split("/")(0),
       UUID("test"),
       Activation.typeName)
 
@@ -147,43 +143,37 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
     CollectorRegistry.defaultRegistry.getSampleValue(
       metricName,
       Array("namespace", "initiator", "action"),
-      Array(namespace, initiator, action))
+      Array(namespace, namespace, action))
 
   private def counter(metricName: String, namespace: String, action: String) =
     CollectorRegistry.defaultRegistry.getSampleValue(
       metricName,
       Array("namespace", "initiator", "action"),
-      Array(namespace, initiator, action))
+      Array(namespace, namespace, action))
 
   private def counterTotal(metricName: String, namespace: String, action: String) =
     CollectorRegistry.defaultRegistry.getSampleValue(
       metricName,
       Array("namespace", "initiator", "action", "kind", "memory"),
-      Array(namespace, initiator, action, kind, memory))
-
-  private def namespaceCounterTotal(metricName: String, namespace: String) =
-    CollectorRegistry.defaultRegistry.getSampleValue(
-      metricName,
-      Array("namespace", "initiator"),
-      Array(namespace, initiator))
+      Array(namespace, namespace, action, kind, memory))
 
   private def counterStatus(metricName: String, namespace: String, action: String, status: String) =
     CollectorRegistry.defaultRegistry.getSampleValue(
       metricName,
       Array("namespace", "initiator", "action", "status"),
-      Array(namespace, initiator, action, status))
+      Array(namespace, namespace, action, status))
 
   private def histogramCount(metricName: String, namespace: String, action: String) =
     CollectorRegistry.defaultRegistry.getSampleValue(
       s"${metricName}_count",
       Array("namespace", "initiator", "action"),
-      Array(namespace, initiator, action))
+      Array(namespace, namespace, action))
 
   private def histogramSum(metricName: String, namespace: String, action: String) =
     CollectorRegistry.defaultRegistry
       .getSampleValue(
         s"${metricName}_sum",
         Array("namespace", "initiator", "action"),
-        Array(namespace, initiator, action))
+        Array(namespace, namespace, action))
       .doubleValue()
 }