You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2020/01/16 10:52:52 UTC

[openwhisk] branch master updated: Update kamon-core, instrumentations and reporters to 2.0.x. (#4768)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 963fa11  Update kamon-core, instrumentations and reporters to 2.0.x. (#4768)
963fa11 is described below

commit 963fa112bb907cb00f22ee482d4f1eeb21b4492f
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Thu Jan 16 11:52:43 2020 +0100

    Update kamon-core, instrumentations and reporters to 2.0.x. (#4768)
    
    This is a needed update to be able to support Scala 2.13.
    No changes in behavior are expected from this. All code and configuration should behave the same way as before and has been adjusted to reflect the settings of the new version.
---
 common/scala/build.gradle                          | 13 +++---
 common/scala/src/main/resources/application.conf   | 29 +++++++++----
 .../org/apache/openwhisk/common/Logging.scala      | 30 +++++--------
 .../org/apache/openwhisk/common/Prometheus.scala   |  4 +-
 .../connector/kafka/KamonMetricsReporter.scala     | 23 ++++++----
 .../database/cosmosdb/CosmosDBArtifactStore.scala  |  6 +--
 .../openwhisk/core/controller/Controller.scala     |  4 +-
 .../core/database/cosmosdb/cache/Main.scala        |  2 +-
 .../apache/openwhisk/core/invoker/Invoker.scala    |  5 +--
 .../core/monitoring/metrics/EventConsumer.scala    | 29 +++++++------
 .../core/monitoring/metrics/KamonRecorder.scala    | 41 +++++++++---------
 .../openwhisk/core/monitoring/metrics/Main.scala   |  2 +-
 .../core/monitoring/metrics/OpenWhiskEvents.scala  |  7 ++--
 .../monitoring/metrics/KamonRecorderTests.scala    | 49 +++++++++++-----------
 .../monitoring/metrics/OpenWhiskEventsTests.scala  |  2 +-
 core/standalone/src/main/resources/standalone.conf |  7 +++-
 .../apache/openwhisk/common/PrometheusTests.scala  |  2 +-
 .../cosmosdb/CosmosDBArtifactStoreTests.scala      |  6 +--
 18 files changed, 141 insertions(+), 120 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 846c0fd..9b6f098 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -58,15 +58,18 @@ dependencies {
     compile "com.github.ben-manes.caffeine:caffeine:2.6.2"
     compile "com.google.code.findbugs:jsr305:3.0.2"
     compile "io.fabric8:kubernetes-client:${gradle.kube_client.version}"
-    compile ("io.kamon:kamon-core_${gradle.scala.depVersion}:1.1.3") {
+
+    //metrics
+    compile ("io.kamon:kamon-core_${gradle.scala.depVersion}:2.0.3") {
         exclude group: 'com.lihaoyi'
     }
-    compile "io.kamon:kamon-statsd_${gradle.scala.depVersion}:1.0.0"
-    compile ("io.kamon:kamon-system-metrics_${gradle.scala.depVersion}:1.0.0") {
+    compile "io.kamon:kamon-statsd_${gradle.scala.depVersion}:2.0.0"
+    compile ("io.kamon:kamon-system-metrics_${gradle.scala.depVersion}:2.0.1") {
         exclude group: 'io.kamon', module: 'sigar-loader'
     }
-    compile "io.kamon:kamon-prometheus_${gradle.scala.depVersion}:1.1.1"
-    compile "io.kamon:kamon-datadog_${gradle.scala.depVersion}:1.0.0"
+    compile "io.kamon:kamon-prometheus_${gradle.scala.depVersion}:2.0.1"
+    compile "io.kamon:kamon-datadog_${gradle.scala.depVersion}:2.0.1"
+
     //for mesos
     compile "com.adobe.api.platform.runtime:mesos-actor:0.0.17"
 
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 7ab8690..808241d 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -36,6 +36,27 @@ akka.http {
 
 #kamon related configuration
 kamon {
+    modules {
+        # Only statsd is enabled by default.
+        statsd-reporter {
+            enabled = true
+        }
+        datadog-agent {
+            enabled = false
+        }
+        datadog-trace-agent {
+            enabled = false
+        }
+        # This should never be set to true as we register Prometheus reporters manually and surface them via akka-http.
+        prometheus-reporter {
+            enabled = false
+        }
+
+        host-metrics {
+            enabled = false
+        }
+    }
+
     environment {
       # Identifier for this service. For keeping it backward compatible setting to natch previous
       # statsd name
@@ -63,10 +84,6 @@ kamon {
 
         metric-key-generator = org.apache.openwhisk.common.WhiskStatsDMetricKeyGenerator
     }
-    system-metrics {
-        # disable the host metrics as we are only interested in JVM metrics
-        host.enabled = false
-    }
     prometheus {
         # We expose the metrics endpoint over akka http. So default server is disabled
         start-embedded-http-server = no
@@ -79,10 +96,6 @@ kamon {
             }
         }
     }
-
-    reporters = [
-        "kamon.statsd.StatsDReporter"
-    ]
 }
 
 whisk {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 2b03ce8..04be6b3 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -24,9 +24,9 @@ import java.time.format.DateTimeFormatter
 import akka.event.Logging._
 import akka.event.LoggingAdapter
 import kamon.Kamon
-import kamon.metric.{MeasurementUnit, Counter => KCounter, Histogram => KHistogram, Gauge => KGauge}
+import kamon.metric.{MeasurementUnit, Counter => KCounter, Gauge => KGauge, Histogram => KHistogram}
 import kamon.statsd.{MetricKeyGenerator, SimpleMetricKeyGenerator}
-import kamon.system.SystemMetrics
+import kamon.tag.TagSet
 import org.apache.openwhisk.core.entity.ControllerInstanceId
 
 trait Logging {
@@ -239,31 +239,25 @@ case class LogMarkerToken(
 
   private def createCounter() = {
     if (TransactionId.metricsKamonTags) {
-      Kamon
-        .counter(createName(toString, "counter"))
-        .refine(tags)
+      Kamon.counter(createName(toString, "counter")).withTags(TagSet.from(tags))
     } else {
-      Kamon.counter(createName(toStringWithSubAction, "counter"))
+      Kamon.counter(createName(toStringWithSubAction, "counter")).withoutTags()
     }
   }
 
   private def createHistogram() = {
     if (TransactionId.metricsKamonTags) {
-      Kamon
-        .histogram(createName(toString, "histogram"), measurementUnit)
-        .refine(tags)
+      Kamon.histogram(createName(toString, "histogram"), measurementUnit).withTags(TagSet.from(tags))
     } else {
-      Kamon.histogram(createName(toStringWithSubAction, "histogram"), measurementUnit)
+      Kamon.histogram(createName(toStringWithSubAction, "histogram"), measurementUnit).withoutTags()
     }
   }
 
   private def createGauge() = {
     if (TransactionId.metricsKamonTags) {
-      Kamon
-        .gauge(createName(toString, "gauge"), measurementUnit)
-        .refine(tags)
+      Kamon.gauge(createName(toString, "gauge"), measurementUnit).withTags(TagSet.from(tags))
     } else {
-      Kamon.gauge(createName(toStringWithSubAction, "gauge"), measurementUnit)
+      Kamon.gauge(createName(toStringWithSubAction, "gauge"), measurementUnit).withoutTags()
     }
   }
 
@@ -296,10 +290,6 @@ object LogMarkerToken {
 }
 
 object MetricEmitter {
-  if (TransactionId.metricsKamon) {
-    SystemMetrics.startCollecting()
-  }
-
   def emitCounterMetric(token: LogMarkerToken, times: Long = 1): Unit = {
     if (TransactionId.metricsKamon) {
       token.counter.increment(times)
@@ -314,7 +304,7 @@ object MetricEmitter {
 
   def emitGaugeMetric(token: LogMarkerToken, value: Long): Unit = {
     if (TransactionId.metricsKamon) {
-      token.gauge.set(value)
+      token.gauge.update(value)
     }
   }
 }
@@ -327,7 +317,7 @@ object MetricEmitter {
  */
 class WhiskStatsDMetricKeyGenerator(config: com.typesafe.config.Config) extends MetricKeyGenerator {
   val simpleGen = new SimpleMetricKeyGenerator(config)
-  override def generateKey(name: String, tags: Map[String, String]): String = {
+  override def generateKey(name: String, tags: TagSet): String = {
     val key = simpleGen.generateKey(name, tags)
     if (key.contains(".counter_")) key.replace(".counter_", ".counter.")
     else if (key.contains(".histogram_")) key.replace(".histogram_", ".histogram.")
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala
index 2c97898..614f836 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala
@@ -27,7 +27,7 @@ import kamon.prometheus.PrometheusReporter
 class KamonPrometheus extends AutoCloseable {
   private val reporter = new PrometheusReporter
   private val v4 = ContentType.parse("text/plain; version=0.0.4; charset=utf-8").right.get
-  private val ref = Kamon.addReporter(reporter)
+  Kamon.registerModule("prometheus", reporter)
 
   def route: Route = path("metrics") {
     get {
@@ -39,7 +39,7 @@ class KamonPrometheus extends AutoCloseable {
 
   private def getReport() = HttpEntity(v4, reporter.scrapeData().getBytes(UTF_8))
 
-  override def close(): Unit = ref.cancel()
+  override def close(): Unit = reporter.stop()
 }
 
 object MetricsRoute {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala
index 730aa52..fbacc72 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KamonMetricsReporter.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
 
 import kamon.Kamon
 import kamon.metric.{Counter, Gauge, Metric}
+import kamon.tag.TagSet
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics.stats.Total
 import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
@@ -32,6 +33,7 @@ import pureconfig.generic.auto._
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.duration.FiniteDuration
 import scala.util.{Success, Try}
+import scala.collection.JavaConverters._
 
 class KamonMetricsReporter extends MetricsReporter {
   import KamonMetricsReporter._
@@ -60,14 +62,14 @@ class KamonMetricsReporter extends MetricsReporter {
   private def add(metric: KafkaMetric): Unit = {
     val mn = metric.metricName()
     if (metricConfig.names.contains(mn.name()) && shouldIncludeMetric(mn)) {
-      val tags = mn.tags()
+      val tags = kafkaTagsToTagSet(mn.tags())
       val metricName = kamonName(mn)
       val bridge = if (isCounterMetric(metric)) {
         val counter = Kamon.counter(metricName)
-        new CounterBridge(metric, counter, counter.refine(tags))
+        new CounterBridge(metric, counter, counter.withTags(tags))
       } else {
         val gauge = Kamon.gauge(metricName)
-        new GaugeBridge(metric, gauge, gauge.refine(tags))
+        new GaugeBridge(metric, gauge, gauge.withTags(tags))
       }
       metrics.putIfAbsent(mn, bridge)
     }
@@ -84,8 +86,8 @@ object KamonMetricsReporter {
 
   case class KafkaMetricConfig(names: Set[String], reportInterval: FiniteDuration)
 
-  abstract class MetricBridge(val kafkaMetric: KafkaMetric, kamonMetric: Metric[_]) {
-    def remove(): Unit = kamonMetric.remove(kafkaMetric.metricName().tags())
+  abstract class MetricBridge(val kafkaMetric: KafkaMetric, kamonMetric: Metric[_, _]) {
+    def remove(): Unit = kamonMetric.remove(kafkaTagsToTagSet(kafkaMetric.metricName().tags()))
     def update(): Unit
 
     def metricValue: Long =
@@ -97,12 +99,12 @@ object KamonMetricsReporter {
         .getOrElse(0L)
   }
 
-  class GaugeBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric[_], gauge: Gauge)
+  class GaugeBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric.Gauge, gauge: Gauge)
       extends MetricBridge(kafkaMetric, kamonMetric) {
-    override def update(): Unit = gauge.set(metricValue)
+    override def update(): Unit = gauge.update(metricValue)
   }
 
-  class CounterBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric[_], counter: Counter)
+  class CounterBridge(kafkaMetric: KafkaMetric, kamonMetric: Metric.Counter, counter: Counter)
       extends MetricBridge(kafkaMetric, kamonMetric) {
     @volatile
     private var lastValue: Long = 0
@@ -131,4 +133,9 @@ object KamonMetricsReporter {
     if (excludedTopicAttributes.contains(m.name())) !m.tags().containsKey("topic")
     else true
   }
+
+  private def kafkaTagsToTagSet(kafkaTags: util.Map[String, String]): TagSet =
+    kafkaTags.asScala.foldLeft(TagSet.Empty) {
+      case (set, (k, v)) => set.withTag(k, v)
+    }
 }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index cec1576..3abc485 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -464,9 +464,9 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected
   private def recordResourceUsage() = {
     getResourceUsage().map { o =>
       o.foreach { u =>
-        u.documentsCount.foreach(documentCountToken.gauge.set(_))
-        u.documentsSize.foreach(ds => documentsSizeToken.gauge.set(ds.toKB))
-        u.indexSize.foreach(is => indexSizeToken.gauge.set(is.toKB))
+        u.documentsCount.foreach(documentCountToken.gauge.update(_))
+        u.documentsSize.foreach(ds => documentsSizeToken.gauge.update(ds.toKB))
+        u.indexSize.foreach(is => indexSizeToken.gauge.update(is.toKB))
         logging.info(this, s"Collection usage stats for [$collName] are ${u.asString}")
         u.indexingProgress.foreach { i =>
           if (i < 100) logging.info(this, s"Indexing for collection [$collName] is at $i%")
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index f93fdfd..4559a85 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -215,12 +215,12 @@ object Controller {
 
   def start(args: Array[String])(implicit actorSystem: ActorSystem, logger: Logging): Unit = {
     ConfigMXBean.register()
-    Kamon.loadReportersFromConfig()
+    Kamon.init()
 
     // Prepare Kamon shutdown
     CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
       logger.info(this, s"Shutting down Kamon with coordinated shutdown")
-      Kamon.stopAllReporters().map(_ => Done)(Implicits.global)
+      Kamon.stopModules().map(_ => Done)(Implicits.global)
     }
 
     // extract configuration data from the environment
diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
index a5060d4..578c724 100644
--- a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
+++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
@@ -30,7 +30,7 @@ object Main {
     implicit val log: Logging = new AkkaLogging(akka.event.Logging.getLogger(system, this))
 
     ConfigMXBean.register()
-    Kamon.loadReportersFromConfig()
+    Kamon.init()
     val port = CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port
     BasicHttpService.startHttpService(new BasicRasService {}.route, port, None)
     CacheInvalidator.start(system.settings.config)
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index b994425..2854f31 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -60,12 +60,11 @@ object Invoker {
     // Replace the hostname of the invoker to the assigned id of the invoker.
     val newKamonConfig = Kamon.config
       .withValue("kamon.environment.host", ConfigValueFactory.fromAnyRef(s"invoker$instance"))
-    Kamon.reconfigure(newKamonConfig)
+    Kamon.init(newKamonConfig)
   }
 
   def main(args: Array[String]): Unit = {
     ConfigMXBean.register()
-    Kamon.loadReportersFromConfig()
     implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
     implicit val actorSystem: ActorSystem =
       ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
@@ -76,7 +75,7 @@ object Invoker {
     // Prepare Kamon shutdown
     CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
       logger.info(this, s"Shutting down Kamon with coordinated shutdown")
-      Kamon.stopAllReporters().map(_ => Done)
+      Kamon.stopModules().map(_ => Done)
     }
 
     // load values for the required properties from the environment
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index a6eca9b..7b5709e 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -55,23 +55,26 @@ case class EventConsumer(settings: ConsumerSettings[String, String],
   private implicit val ec: ExecutionContext = system.dispatcher
 
   //Record the rate of events received
-  private val activationCounter = Kamon.counter("openwhisk.userevents.global.activations")
-  private val metricCounter = Kamon.counter("openwhisk.userevents.global.metric")
+  private val activationCounter = Kamon.counter("openwhisk.userevents.global.activations").withoutTags()
+  private val metricCounter = Kamon.counter("openwhisk.userevents.global.metric").withoutTags()
 
   private val statusCounter = Kamon.counter("openwhisk.userevents.global.status")
-  private val coldStartCounter = Kamon.counter("openwhisk.userevents.global.coldStarts")
+  private val coldStartCounter = Kamon.counter("openwhisk.userevents.global.coldStarts").withoutTags()
 
-  private val statusSuccess = statusCounter.refine("status" -> ActivationResponse.statusSuccess)
-  private val statusFailure = statusCounter.refine("status" -> "failure")
-  private val statusApplicationError = statusCounter.refine("status" -> ActivationResponse.statusApplicationError)
-  private val statusDeveloperError = statusCounter.refine("status" -> ActivationResponse.statusDeveloperError)
-  private val statusInternalError = statusCounter.refine("status" -> ActivationResponse.statusWhiskError)
+  private val statusSuccess = statusCounter.withTag("status", ActivationResponse.statusSuccess)
+  private val statusFailure = statusCounter.withTag("status", "failure")
+  private val statusApplicationError = statusCounter.withTag("status", ActivationResponse.statusApplicationError)
+  private val statusDeveloperError = statusCounter.withTag("status", ActivationResponse.statusDeveloperError)
+  private val statusInternalError = statusCounter.withTag("status", ActivationResponse.statusWhiskError)
 
-  private val waitTime = Kamon.histogram("openwhisk.userevents.global.waitTime", MeasurementUnit.time.milliseconds)
-  private val initTime = Kamon.histogram("openwhisk.userevents.global.initTime", MeasurementUnit.time.milliseconds)
-  private val duration = Kamon.histogram("openwhisk.userevents.global.duration", MeasurementUnit.time.milliseconds)
+  private val waitTime =
+    Kamon.histogram("openwhisk.userevents.global.waitTime", MeasurementUnit.time.milliseconds).withoutTags()
+  private val initTime =
+    Kamon.histogram("openwhisk.userevents.global.initTime", MeasurementUnit.time.milliseconds).withoutTags()
+  private val duration =
+    Kamon.histogram("openwhisk.userevents.global.duration", MeasurementUnit.time.milliseconds).withoutTags()
 
-  private val lagGauge = Kamon.gauge("openwhisk.userevents.consumer.lag")
+  private val lagGauge = Kamon.gauge("openwhisk.userevents.consumer.lag").withoutTags()
 
   def shutdown(): Future[Done] = {
     lagRecorder.cancel()
@@ -96,7 +99,7 @@ case class EventConsumer(settings: ConsumerSettings[String, String],
     .run()
 
   private val lagRecorder =
-    system.scheduler.schedule(10.seconds, 10.seconds)(lagGauge.set(consumerLag))
+    system.scheduler.schedule(10.seconds, 10.seconds)(lagGauge.update(consumerLag))
 
   private def processEvent(value: String): Unit = {
     EventMessage
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
index c2e785b..f371cc5 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
@@ -21,6 +21,7 @@ import akka.event.slf4j.SLF4JLogging
 import org.apache.openwhisk.core.connector.{Activation, Metric}
 import kamon.Kamon
 import kamon.metric.MeasurementUnit
+import kamon.tag.TagSet
 import org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
 
 import scala.collection.concurrent.TrieMap
@@ -64,8 +65,8 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg
   }
 
   case class LimitKamonMetrics(namespace: String) {
-    private val concurrentLimit = Kamon.counter(concurrentLimitMetric).refine(`actionNamespace` -> namespace)
-    private val timedLimit = Kamon.counter(timedLimitMetric).refine(`actionNamespace` -> namespace)
+    private val concurrentLimit = Kamon.counter(concurrentLimitMetric).withTag(`actionNamespace`, namespace)
+    private val timedLimit = Kamon.counter(timedLimitMetric).withTag(`actionNamespace`, namespace)
 
     def record(m: Metric): Unit = {
       m.metricName match {
@@ -83,23 +84,25 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg
                                     memory: String,
                                     initiator: String) {
     private val activationTags =
-      Map(
-        `actionNamespace` -> namespace,
-        `initiatorNamespace` -> initiator,
-        `actionName` -> action,
-        `actionKind` -> kind,
-        `actionMemory` -> memory)
+      TagSet.from(
+        Map(
+          `actionNamespace` -> namespace,
+          `initiatorNamespace` -> initiator,
+          `actionName` -> action,
+          `actionKind` -> kind,
+          `actionMemory` -> memory))
     private val namespaceActivationsTags =
-      Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator)
-    private val tags = Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator, `actionName` -> action)
-
-    private val namespaceActivations = Kamon.counter(namespaceActivationMetric).refine(namespaceActivationsTags)
-    private val activations = Kamon.counter(activationMetric).refine(activationTags)
-    private val coldStarts = Kamon.counter(coldStartMetric).refine(tags)
-    private val waitTime = Kamon.histogram(waitTimeMetric, MeasurementUnit.time.milliseconds).refine(tags)
-    private val initTime = Kamon.histogram(initTimeMetric, MeasurementUnit.time.milliseconds).refine(tags)
-    private val duration = Kamon.histogram(durationMetric, MeasurementUnit.time.milliseconds).refine(tags)
-    private val responseSize = Kamon.histogram(responseSizeMetric, MeasurementUnit.information.bytes).refine(tags)
+      TagSet.from(Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator))
+    private val tags =
+      TagSet.from(Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator, `actionName` -> action))
+
+    private val namespaceActivations = Kamon.counter(namespaceActivationMetric).withTags(namespaceActivationsTags)
+    private val activations = Kamon.counter(activationMetric).withTags(activationTags)
+    private val coldStarts = Kamon.counter(coldStartMetric).withTags(tags)
+    private val waitTime = Kamon.histogram(waitTimeMetric, MeasurementUnit.time.milliseconds).withTags(tags)
+    private val initTime = Kamon.histogram(initTimeMetric, MeasurementUnit.time.milliseconds).withTags(tags)
+    private val duration = Kamon.histogram(durationMetric, MeasurementUnit.time.milliseconds).withTags(tags)
+    private val responseSize = Kamon.histogram(responseSizeMetric, MeasurementUnit.information.bytes).withTags(tags)
 
     def record(a: Activation, metricConfig: MetricConfig): Unit = {
       namespaceActivations.increment()
@@ -122,7 +125,7 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg
       waitTime.record(a.waitTime.toMillis)
       duration.record(a.duration.toMillis)
 
-      Kamon.counter(statusMetric).refine(tags + ("status" -> a.status)).increment()
+      Kamon.counter(statusMetric).withTags(tags.withTag("status", a.status)).increment()
 
       a.size.foreach(responseSize.record(_))
     }
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/Main.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/Main.scala
index 9c1b932..286906f 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/Main.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/Main.scala
@@ -27,7 +27,7 @@ import scala.concurrent.{Await, ExecutionContextExecutor, Future}
 
 object Main {
   def main(args: Array[String]): Unit = {
-    Kamon.loadReportersFromConfig()
+    Kamon.init()
     implicit val system: ActorSystem = ActorSystem("events-actor-system")
     implicit val materializer: ActorMaterializer = ActorMaterializer()
     val binding = OpenWhiskEvents.start(system.settings.config)
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
index caa516f..c4e14b6 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
@@ -25,7 +25,6 @@ import akka.stream.ActorMaterializer
 import com.typesafe.config.Config
 import kamon.Kamon
 import kamon.prometheus.PrometheusReporter
-import kamon.system.SystemMetrics
 import org.apache.kafka.common.serialization.StringDeserializer
 import pureconfig._
 import pureconfig.generic.auto._
@@ -39,10 +38,10 @@ object OpenWhiskEvents extends SLF4JLogging {
   def start(config: Config)(implicit system: ActorSystem,
                             materializer: ActorMaterializer): Future[Http.ServerBinding] = {
     implicit val ec: ExecutionContext = system.dispatcher
-    Kamon.reconfigure(config)
+
     val prometheusReporter = new PrometheusReporter()
-    Kamon.addReporter(prometheusReporter)
-    SystemMetrics.startCollecting()
+    Kamon.registerModule("prometheus", prometheusReporter)
+    Kamon.init(config)
 
     val metricConfig = loadConfigOrThrow[MetricConfig](config, "whisk.user-events")
 
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
index 446ed98..3ef93d5 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
@@ -20,9 +20,10 @@ package org.apache.openwhisk.core.monitoring.metrics
 import java.time.Duration
 
 import com.typesafe.config.{Config, ConfigFactory}
-import kamon.metric.{PeriodSnapshot, PeriodSnapshotAccumulator}
-import kamon.util.Registration
-import kamon.{Kamon, MetricReporter}
+import kamon.metric.PeriodSnapshot
+import kamon.module.MetricReporter
+import kamon.Kamon
+import kamon.tag.Lookups
 import org.apache.openwhisk.core.connector.{Activation, EventMessage}
 import org.apache.openwhisk.core.entity.{ActivationResponse, Subject, UUID}
 import org.junit.runner.RunWith
@@ -33,7 +34,7 @@ import scala.concurrent.duration._
 
 @RunWith(classOf[JUnitRunner])
 class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with KamonMetricNames {
-  var reporterReg: Registration = _
+  var reporter: MetricReporter = _
 
   override protected def beforeEach(): Unit = {
     super.beforeEach()
@@ -44,12 +45,13 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
         |    optimistic-tick-alignment = no
         |  }
         |}""".stripMargin).withFallback(ConfigFactory.load())
+    Kamon.registerModule("test", TestReporter)
     Kamon.reconfigure(newConfig)
-    reporterReg = Kamon.addReporter(TestReporter)
+    reporter = TestReporter
   }
 
   override protected def afterEach(): Unit = {
-    reporterReg.cancel()
+    reporter.stop()
     Kamon.reconfigure(ConfigFactory.load())
     super.afterEach()
   }
@@ -88,15 +90,15 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
     TestReporter.counter(activationMetric, namespaceDemo, actionWithCustomPackage)(0).value shouldBe 1
     TestReporter
       .counter(activationMetric, namespaceDemo, actionWithCustomPackage)
-      .filter((t) => t.tags.get(actionMemory).get == memory.toString)(0)
+      .filter((t) => t.tags.get(Lookups.plain(actionMemory)) == memory.toString)(0)
       .value shouldBe 1
     TestReporter
       .counter(activationMetric, namespaceDemo, actionWithCustomPackage)
-      .filter((t) => t.tags.get(actionKind).get == kind)(0)
+      .filter((t) => t.tags.get(Lookups.plain(actionKind)) == kind)(0)
       .value shouldBe 1
     TestReporter
       .counter(statusMetric, namespaceDemo, actionWithCustomPackage)
-      .filter((t) => t.tags.get(actionStatus).get == ActivationResponse.statusDeveloperError)(0)
+      .filter((t) => t.tags.get(Lookups.plain(actionStatus)) == ActivationResponse.statusDeveloperError)(0)
       .value shouldBe 1
     TestReporter.counter(coldStartMetric, namespaceDemo, actionWithCustomPackage)(0).value shouldBe 1
     TestReporter.histogram(waitTimeMetric, namespaceDemo, actionWithCustomPackage).size shouldBe 1
@@ -123,51 +125,48 @@ class KamonRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with Kamo
       Activation.typeName)
 
   private object TestReporter extends MetricReporter {
-    var snapshotAccumulator = new PeriodSnapshotAccumulator(Duration.ofDays(1), Duration.ZERO)
+    var snapshotAccumulator = PeriodSnapshot.accumulator(Duration.ofDays(1), Duration.ZERO)
     override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
       snapshotAccumulator.add(snapshot)
     }
 
-    override def start(): Unit = {}
     override def stop(): Unit = {}
     override def reconfigure(config: Config): Unit = {}
 
     def reset(): Unit = {
-      snapshotAccumulator = new PeriodSnapshotAccumulator(Duration.ofDays(1), Duration.ZERO)
+      snapshotAccumulator = PeriodSnapshot.accumulator(Duration.ofDays(1), Duration.ZERO)
     }
 
     def counter(metricName: String, namespace: String, action: String) = {
-      System.out.println()
       snapshotAccumulator
         .peek()
-        .metrics
         .counters
         .filter(_.name == metricName)
-        .filter((t) => t.tags.get(actionNamespace).get == namespace)
-        .filter((t) => t.tags.get(initiatorNamespace).get == initiator)
-        .filter((t) => t.tags.get(actionName).get == action)
+        .flatMap(_.instruments)
+        .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
+        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
+        .filter(_.tags.get(Lookups.plain(actionName)) == action)
     }
 
     def namespaceCounter(metricName: String, namespace: String) = {
-      System.out.println()
       snapshotAccumulator
         .peek()
-        .metrics
         .counters
         .filter(_.name == metricName)
-        .filter((t) => t.tags.get(actionNamespace).get == namespace)
-        .filter((t) => t.tags.get(initiatorNamespace).get == initiator)
+        .flatMap(_.instruments)
+        .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
+        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
     }
 
     def histogram(metricName: String, namespace: String, action: String) = {
       snapshotAccumulator
         .peek()
-        .metrics
         .histograms
         .filter(_.name == metricName)
-        .filter((t) => t.tags.get(actionNamespace).get == namespace)
-        .filter((t) => t.tags.get(initiatorNamespace).get == initiator)
-        .filter((t) => t.tags.get(actionName).get == action)
+        .flatMap(_.instruments)
+        .filter(_.tags.get(Lookups.plain(actionNamespace)) == namespace)
+        .filter(_.tags.get(Lookups.plain(initiatorNamespace)) == initiator)
+        .filter(_.tags.get(Lookups.plain(actionName)) == action)
     }
   }
 }
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
index 9ba4e98..f8a9a2d 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEventsTests.scala
@@ -57,7 +57,7 @@ class OpenWhiskEventsTests extends KafkaSpecBase {
     res shouldBe Some(StatusCodes.OK, "pong")
 
     //Check if metrics using Kamon API gets included in consolidated Prometheus
-    Kamon.counter("fooTest").increment(42)
+    Kamon.counter("fooTest").withoutTags().increment(42)
     sleep(1.second)
     val metricRes = get("localhost", httpPort, "/metrics")
     metricRes.get._2 should include("fooTest")
diff --git a/core/standalone/src/main/resources/standalone.conf b/core/standalone/src/main/resources/standalone.conf
index 5c8d7cf..43f5995 100644
--- a/core/standalone/src/main/resources/standalone.conf
+++ b/core/standalone/src/main/resources/standalone.conf
@@ -18,7 +18,12 @@
 include classpath("application.conf")
 
 kamon {
-  reporters = []
+  modules {
+    # Disable statsd in standalone mode as well.
+    statsd-reporter {
+      enabled = false
+    }
+  }
 }
 
 whisk {
diff --git a/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
index 710d361..8d0c80c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
@@ -55,7 +55,7 @@ class PrometheusTests extends FlatSpec with Matchers with ScalatestRouteTest wit
 
   it should "respond to /metrics" in {
     val api = new KamonPrometheus
-    Kamon.counter("foo_bar").increment(42)
+    Kamon.counter("foo_bar").withoutTags().increment(42)
 
     //Sleep to ensure that Kamon metrics are pushed to reporters
     Thread.sleep(2.seconds.toMillis)
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index 0cd5f43..566d2ba 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -23,7 +23,7 @@ import akka.stream.scaladsl.Source
 import com.typesafe.config.ConfigFactory
 import io.netty.util.ResourceLeakDetector
 import io.netty.util.ResourceLeakDetector.Level
-import kamon.metric.LongAdderCounter
+import kamon.metric.Counter.LongAdder
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.database.DocumentSerializer
 import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
@@ -186,8 +186,8 @@ class CosmosDBArtifactStoreTests extends FlatSpec with CosmosDBStoreBehaviorBase
     //would increment a counter
     if (TransactionId.metricsKamonTags) {
       RetryMetricsCollector.getCounter(CosmosDBAction.Create) match {
-        case Some(x: LongAdderCounter) => x.snapshot(false).value.toInt
-        case _                         => 0
+        case Some(x: LongAdder) => x.snapshot(false).intValue()
+        case _                  => 0
       }
     } else {
       RetryMetricsCollector.retryCounter.cur.toInt