You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/10/27 12:48:41 UTC

[incubator-openwhisk] branch master updated: Emit metrics via kamon. (#2857)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e7c05c2  Emit metrics via kamon. (#2857)
e7c05c2 is described below

commit e7c05c2c2d49c6592ccb976679434309dc6f4015
Author: Martin Henke <ma...@web.de>
AuthorDate: Fri Oct 27 14:48:39 2017 +0200

    Emit metrics via kamon. (#2857)
---
 ansible/group_vars/all                             | 10 ++++
 ansible/roles/controller/tasks/deploy.yml          |  6 ++
 ansible/roles/invoker/tasks/deploy.yml             |  4 ++
 common/scala/build.gradle                          |  2 +
 common/scala/src/main/resources/application.conf   | 41 +++++++++++++
 .../src/main/scala/whisk/common/Logging.scala      | 21 ++++++-
 .../main/scala/whisk/common/TransactionId.scala    | 67 ++++++++++++++++++----
 .../main/scala/whisk/http/BasicHttpService.scala   | 11 +++-
 .../controller/src/main/resources/application.conf |  2 +-
 .../scala/whisk/core/controller/Controller.scala   | 14 +++++
 .../main/scala/whisk/core/invoker/Invoker.scala    | 14 +++++
 docs/metrics.md                                    | 47 +++++++++++++++
 12 files changed, 224 insertions(+), 15 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index e7dda4e..d7dfae9 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -326,3 +326,13 @@ cli_arch:
 logs:
   dir:
     become: "{{ logs_dir_become | default(false) }}"
+
+# Metrics Configuration
+metrics:
+  log: 
+    enabled: "{{ metrics_log | default(true) }}"    
+  kamon: 
+    enabled: "{{ metrics_kamon | default(false) }}"
+    host: "{{ metrics_kamon_statsd_host | default('') }}"
+    port: "{{ metrics_kamon_statsd_port | default('8125') }}"
+
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 3afb764..d128053 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -79,6 +79,12 @@
       "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
       "AKKA_CLUSTER_BIND_PORT": "{{ controller.akka.cluster.bindPort }}"
       "AKKA_ACTOR_PROVIDER": "{{ controller.akka.provider }}"
+
+      "METRICS_KAMON": "{{ metrics.kamon.enabled }}"
+      "METRICS_LOG": "{{ metrics.log.enabled }}"
+      "METRICS_KAMON_HOST": "{{ metrics.kamon.host }}"
+      "METRICS_KAMON_PORT": "{{ metrics.kamon.port }}"
+
     volumes:
       - "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs"
     ports:
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 0258ff4..0669362 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -144,6 +144,10 @@
         -e INVOKER_USE_RUNC='{{ invoker.useRunc }}'
         -e INVOKER_NAME='{{ groups['invokers'].index(inventory_hostname) }}'
         -e WHISK_LOGS_DIR='{{ whisk_logs_dir }}'
+        -e METRICS_KAMON='{{ metrics.kamon.enabled }}'
+        -e METRICS_LOG='{{ metrics.log.enabled }}'
+        -e METRICS_KAMON_HOST='{{ metrics.kamon.host }}'
+        -e METRICS_KAMON_PORT='{{ metrics.kamon.port }}'
         -v /sys/fs/cgroup:/sys/fs/cgroup
         -v /run/runc:/run/runc
         -v {{ whisk_logs_dir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}:/logs
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index e6b0a74..c5d70a0 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -35,6 +35,8 @@ dependencies {
     }
     compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
     compile 'com.google.code.findbugs:jsr305:3.0.2'
+    compile 'io.kamon:kamon-core_2.11:0.6.7'
+    compile 'io.kamon:kamon-statsd_2.11:0.6.7'
 }
 
 tasks.withType(ScalaCompile) {
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index d704334..683a804 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -12,3 +12,44 @@ akka.http {
         max-open-requests = 128
     }
 }
+
+#kamon related configuration
+kamon {
+
+    metric {
+        tick-interval = 1 second
+    }
+
+    statsd {
+        # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
+        # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.
+        hostname = ${?METRICS_KAMON_HOST}
+        port = ${?METRICS_KAMON_PORT}
+
+        # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
+        # kamon.metrics.tick-interval setting.
+        flush-interval = 1 second
+
+        # Max packet size for UDP metrics data sent to StatsD.
+        max-packet-size = 1024 bytes
+
+        # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
+        # collection for your desired entities must be activated under the kamon.metrics.filters settings.
+        includes {
+            actor      =  [ "*" ]
+            trace      =  [ "*" ]
+            dispatcher =  [ "*" ]
+        }
+
+        simple-metric-key-generator {
+            # Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows
+            # this pattern:
+            #    application.host.entity.entity-name.metric-name
+            application = "openwhisk-statsd"
+        }
+    }
+
+    modules {
+        kamon-statsd.auto-start = yes
+    }
+}
\ No newline at end of file
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 470f9a8..2b971b0 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -26,6 +26,7 @@ import java.time.format.DateTimeFormatter
 import akka.event.Logging.{DebugLevel, ErrorLevel, InfoLevel, WarningLevel}
 import akka.event.Logging.LogLevel
 import akka.event.LoggingAdapter
+import kamon.Kamon
 
 trait Logging {
 
@@ -81,7 +82,7 @@ trait Logging {
 }
 
 /**
- * Implementaion of Logging, that uses akka logging.
+ * Implementation of Logging, that uses Akka logging.
  */
 class AkkaLogging(loggingAdapter: LoggingAdapter) extends Logging {
   def emit(loglevel: LogLevel, id: TransactionId, from: AnyRef, message: String) = {
@@ -156,6 +157,7 @@ private object Logging {
     if (simpleName.endsWith("$")) simpleName.dropRight(1)
     else simpleName
   }
+
 }
 
 private object Emitter {
@@ -178,6 +180,23 @@ object LogMarkerToken {
   }
 }
 
+object MetricEmitter {
+
+  val metrics = Kamon.metrics
+
+  def emitCounterMetric(token: LogMarkerToken) = {
+    metrics
+      .counter(token.toString)
+      .increment(1)
+  }
+
+  def emitHistogramMetric(token: LogMarkerToken, value: Long) = {
+    metrics
+      .histogram(token.toString)
+      .record(value)
+  }
+}
+
 object LoggingMarkers {
 
   val start = "start"
diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala b/common/scala/src/main/scala/whisk/common/TransactionId.scala
index 1e04ce6..2cb99a7 100644
--- a/common/scala/src/main/scala/whisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala
@@ -55,7 +55,17 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
    */
   def mark(from: AnyRef, marker: LogMarkerToken, message: String = "", logLevel: LogLevel = InfoLevel)(
     implicit logging: Logging) = {
-    logging.emit(logLevel, this, from, createMessageWithMarker(message, LogMarker(marker, deltaToStart)))
+
+    if (TransactionId.metricsLog) {
+      logging.emit(logLevel, this, from, createMessageWithMarker(message, LogMarker(marker, deltaToStart)))
+    } else if (message.nonEmpty) {
+      logging.emit(logLevel, this, from, message)
+    }
+
+    if (TransactionId.metricsKamon) {
+      MetricEmitter.emitCounterMetric(marker)
+    }
+
   }
 
   /**
@@ -71,7 +81,13 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
    */
   def started(from: AnyRef, marker: LogMarkerToken, message: String = "", logLevel: LogLevel = InfoLevel)(
     implicit logging: Logging): StartMarker = {
-    logging.emit(logLevel, this, from, createMessageWithMarker(message, LogMarker(marker, deltaToStart)))
+
+    if (TransactionId.metricsLog) {
+      logging.emit(logLevel, this, from, createMessageWithMarker(message, LogMarker(marker, deltaToStart)))
+    } else if (message.nonEmpty) {
+      logging.emit(logLevel, this, from, message)
+    }
+
     StartMarker(Instant.now, marker)
   }
 
@@ -89,13 +105,24 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
                message: String = "",
                logLevel: LogLevel = InfoLevel,
                endTime: Instant = Instant.now(Clock.systemUTC))(implicit logging: Logging) = {
+
     val endMarker =
       LogMarkerToken(startMarker.startMarker.component, startMarker.startMarker.action, LoggingMarkers.finish)
-    logging.emit(
-      logLevel,
-      this,
-      from,
-      createMessageWithMarker(message, LogMarker(endMarker, deltaToStart, Some(deltaToMarker(startMarker, endTime)))))
+    val deltaToEnd = deltaToMarker(startMarker, endTime)
+
+    if (TransactionId.metricsLog) {
+      logging.emit(
+        logLevel,
+        this,
+        from,
+        createMessageWithMarker(message, LogMarker(endMarker, deltaToStart, Some(deltaToEnd))))
+    } else if (message.nonEmpty) {
+      logging.emit(logLevel, this, from, message)
+    }
+
+    if (TransactionId.metricsKamon) {
+      MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
+    }
   }
 
   /**
@@ -108,13 +135,24 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
    */
   def failed(from: AnyRef, startMarker: StartMarker, message: String = "", logLevel: LogLevel = WarningLevel)(
     implicit logging: Logging) = {
+
     val endMarker =
       LogMarkerToken(startMarker.startMarker.component, startMarker.startMarker.action, LoggingMarkers.error)
-    logging.emit(
-      logLevel,
-      this,
-      from,
-      createMessageWithMarker(message, LogMarker(endMarker, deltaToStart, Some(deltaToMarker(startMarker)))))
+    val deltaToEnd = deltaToMarker(startMarker)
+
+    if (TransactionId.metricsLog) {
+      logging.emit(
+        logLevel,
+        this,
+        from,
+        createMessageWithMarker(message, LogMarker(endMarker, deltaToStart, Some(deltaToEnd))))
+    } else if (message.nonEmpty) {
+      logging.emit(logLevel, this, from, message)
+    }
+
+    if (TransactionId.metricsKamon) {
+      MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
+    }
   }
 
   /**
@@ -158,6 +196,11 @@ case class StartMarker(val start: Instant, startMarker: LogMarkerToken)
 protected case class TransactionMetadata(val id: Long, val start: Instant)
 
 object TransactionId {
+
+  // get the metric parameters directly from the environment since WhiskConfig can not be instantiated here
+  val metricsKamon: Boolean = sys.env.get("METRICS_KAMON").getOrElse("False").toBoolean
+  val metricsLog: Boolean = sys.env.get("METRICS_LOG").getOrElse("True").toBoolean
+
   val unknown = TransactionId(0)
   val testing = TransactionId(-1) // Common id for for unit testing
   val invoker = TransactionId(-100) // Invoker startup/shutdown or GC activity
diff --git a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
index f250ddf..33d6295 100644
--- a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
+++ b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
@@ -36,6 +36,7 @@ import whisk.common.LogMarkerToken
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionCounter
 import whisk.common.TransactionId
+import whisk.common.MetricEmitter
 
 /**
  * This trait extends the Akka Directives and Actor with logging and transaction counting
@@ -121,7 +122,15 @@ trait BasicHttpService extends Directives with TransactionCounter {
       val token = LogMarkerToken("http", s"${m.toLowerCase}.${res.status.intValue}", LoggingMarkers.count)
       val marker = LogMarker(token, tid.deltaToStart, Some(tid.deltaToStart))
 
-      Some(LogEntry(s"[$tid] [$name] $marker", l))
+      if (TransactionId.metricsKamon) {
+        MetricEmitter.emitHistogramMetric(token, tid.deltaToStart)
+      }
+      if (TransactionId.metricsLog) {
+        Some(LogEntry(s"[$tid] [$name] $marker", l))
+      } else {
+        None
+      }
+
     case _ => None // other kind of responses
   }
 }
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index d32150c..6e13354 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -81,4 +81,4 @@ akka {
     auto-down-unreachable-after = ${?AUTO_DOWN_UNREACHABLE_AFTER}
     #distributed-data.notify-subscribers-interval = 0.01
   }
-}
+}
\ No newline at end of file
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index fe4c75d..6c8bef5 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -19,15 +19,21 @@ package whisk.core.controller
 
 import scala.concurrent.Await
 import scala.concurrent.duration.DurationInt
+import scala.concurrent.Future
 import scala.util.{Failure, Success}
+import akka.Done
 import akka.actor.ActorSystem
+import akka.actor.CoordinatedShutdown
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.model.Uri
 import akka.http.scaladsl.server.Route
 import akka.stream.ActorMaterializer
 import spray.json._
+
 import spray.json.DefaultJsonProtocol._
 
+import kamon.Kamon
+
 import whisk.common.AkkaLogging
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
@@ -168,9 +174,17 @@ object Controller {
       "runtimes" -> runtimes.toJson)
 
   def main(args: Array[String]): Unit = {
+    Kamon.start()
     implicit val actorSystem = ActorSystem("controller-actor-system")
     implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
 
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.shutdown()
+      Future.successful(Done)
+    }
+
     // extract configuration data from the environment
     val config = new WhiskConfig(requiredProperties)
     val port = config.servicePort.toInt
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 0e47bd4..e1fec08 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -19,11 +19,16 @@ package whisk.core.invoker
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
+import scala.concurrent.Future
 import scala.util.Failure
 
 import com.redis.RedisClient
 
+import kamon.Kamon
+
+import akka.Done
 import akka.actor.ActorSystem
+import akka.actor.CoordinatedShutdown
 import akka.stream.ActorMaterializer
 import whisk.common.AkkaLogging
 import whisk.common.Scheduler
@@ -63,11 +68,20 @@ object Invoker {
       Map(invokerName -> "")
 
   def main(args: Array[String]): Unit = {
+    Kamon.start()
+
     implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
     implicit val actorSystem: ActorSystem =
       ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
     implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
 
+    // Prepare Kamon shutdown
+    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+      Kamon.shutdown()
+      Future.successful(Done)
+    }
+
     // load values for the required properties from the environment
     implicit val config = new WhiskConfig(requiredProperties)
 
diff --git a/docs/metrics.md b/docs/metrics.md
new file mode 100644
index 0000000..5d4fcc0
--- /dev/null
+++ b/docs/metrics.md
@@ -0,0 +1,47 @@
+# Openwhisk Metric Support
+
+Openwhick contains the capability to send metric information to a statsd server. This capability is disabled per default. Instead metric information is normally written to the log files in logmarker format.
+
+## Configuration
+
+Both capabilties can be enabled or disabled separately during deployment via Ansible configuration in the 'goup_vars/all' file of an  environment.
+
+There are four configurations options available:
+
+- **metrics_log** [true / false  (default: true)] 
+
+  Enable/disable whether the metric information is written out to the log files in logmarker format. 
+  
+  *Beware: Even if set to false all messages adjourning the log markers are still written out to the log*
+
+- **metrics_kamon** [true / false (default: false)]
+
+  Enable/disable whther metric information is send the configured statsd server.
+
+- **metrics_kamon_statsd_host** [hostname or ip address]
+
+  Hostname or ip address of the statsd server
+
+- **metrics_kamon_statsd_port** [port number (default:8125)]
+
+  Port number of the statsd server
+
+
+Example configuration:
+
+```
+metrics_kamon: true
+metrics_kamon_statsd_host: '192.168.99.100'
+metrics_kamon_statsd_port: '8125'
+metrics_log: true
+```
+
+## Testing the statsd metric support
+
+The Kamon project privides an integrated docker image containing statsd and a connected Grafana dashboard via [this Github project](https://github.com/kamon-io/docker-grafana-graphite). This image is helpful for testing the metrices sent via statsd.
+
+Please follow these [instructions](https://github.com/kamon-io/docker-grafana-graphite/blob/master/README.md) to start the docker image in your local docker environment.
+
+The docker image exposes statsd via the (standard) port 8125 and a Graphana dashboard via port 8080 on your docker host.
+
+The address of your docker host has to be configured in the `metrics_kamon_statsd_host` configuration property.

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].