You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/10/27 12:48:41 UTC
[incubator-openwhisk] branch master updated: Emit metrics via
kamon. (#2857)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e7c05c2 Emit metrics via kamon. (#2857)
e7c05c2 is described below
commit e7c05c2c2d49c6592ccb976679434309dc6f4015
Author: Martin Henke <ma...@web.de>
AuthorDate: Fri Oct 27 14:48:39 2017 +0200
Emit metrics via kamon. (#2857)
---
ansible/group_vars/all | 10 ++++
ansible/roles/controller/tasks/deploy.yml | 6 ++
ansible/roles/invoker/tasks/deploy.yml | 4 ++
common/scala/build.gradle | 2 +
common/scala/src/main/resources/application.conf | 41 +++++++++++++
.../src/main/scala/whisk/common/Logging.scala | 21 ++++++-
.../main/scala/whisk/common/TransactionId.scala | 67 ++++++++++++++++++----
.../main/scala/whisk/http/BasicHttpService.scala | 11 +++-
.../controller/src/main/resources/application.conf | 2 +-
.../scala/whisk/core/controller/Controller.scala | 14 +++++
.../main/scala/whisk/core/invoker/Invoker.scala | 14 +++++
docs/metrics.md | 47 +++++++++++++++
12 files changed, 224 insertions(+), 15 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index e7dda4e..d7dfae9 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -326,3 +326,13 @@ cli_arch:
logs:
dir:
become: "{{ logs_dir_become | default(false) }}"
+
+# Metrics Configuration
+metrics:
+ log:
+ enabled: "{{ metrics_log | default(true) }}"
+ kamon:
+ enabled: "{{ metrics_kamon | default(false) }}"
+ host: "{{ metrics_kamon_statsd_host | default('') }}"
+ port: "{{ metrics_kamon_statsd_port | default('8125') }}"
+
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 3afb764..d128053 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -79,6 +79,12 @@
"AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
"AKKA_CLUSTER_BIND_PORT": "{{ controller.akka.cluster.bindPort }}"
"AKKA_ACTOR_PROVIDER": "{{ controller.akka.provider }}"
+
+ "METRICS_KAMON": "{{ metrics.kamon.enabled }}"
+ "METRICS_LOG": "{{ metrics.log.enabled }}"
+ "METRICS_KAMON_HOST": "{{ metrics.kamon.host }}"
+ "METRICS_KAMON_PORT": "{{ metrics.kamon.port }}"
+
volumes:
- "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs"
ports:
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 0258ff4..0669362 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -144,6 +144,10 @@
-e INVOKER_USE_RUNC='{{ invoker.useRunc }}'
-e INVOKER_NAME='{{ groups['invokers'].index(inventory_hostname) }}'
-e WHISK_LOGS_DIR='{{ whisk_logs_dir }}'
+ -e METRICS_KAMON='{{ metrics.kamon.enabled }}'
+ -e METRICS_LOG='{{ metrics.log.enabled }}'
+ -e METRICS_KAMON_HOST='{{ metrics.kamon.host }}'
+ -e METRICS_KAMON_PORT='{{ metrics.kamon.port }}'
-v /sys/fs/cgroup:/sys/fs/cgroup
-v /run/runc:/run/runc
-v {{ whisk_logs_dir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}:/logs
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index e6b0a74..c5d70a0 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -35,6 +35,8 @@ dependencies {
}
compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
compile 'com.google.code.findbugs:jsr305:3.0.2'
+ compile 'io.kamon:kamon-core_2.11:0.6.7'
+ compile 'io.kamon:kamon-statsd_2.11:0.6.7'
}
tasks.withType(ScalaCompile) {
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index d704334..683a804 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -12,3 +12,44 @@ akka.http {
max-open-requests = 128
}
}
+
+#kamon related configuration
+kamon {
+
+ metric {
+ tick-interval = 1 second
+ }
+
+ statsd {
+ # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
+ # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.
+ hostname = ${?METRICS_KAMON_HOST}
+ port = ${?METRICS_KAMON_PORT}
+
+ # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
+ # kamon.metrics.tick-interval setting.
+ flush-interval = 1 second
+
+ # Max packet size for UDP metrics data sent to StatsD.
+ max-packet-size = 1024 bytes
+
+ # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
+ # collection for your desired entities must be activated under the kamon.metrics.filters settings.
+ includes {
+ actor = [ "*" ]
+ trace = [ "*" ]
+ dispatcher = [ "*" ]
+ }
+
+ simple-metric-key-generator {
+ # Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows
+ # this pattern:
+ # application.host.entity.entity-name.metric-name
+ application = "openwhisk-statsd"
+ }
+ }
+
+ modules {
+ kamon-statsd.auto-start = yes
+ }
+}
\ No newline at end of file
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 470f9a8..2b971b0 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -26,6 +26,7 @@ import java.time.format.DateTimeFormatter
import akka.event.Logging.{DebugLevel, ErrorLevel, InfoLevel, WarningLevel}
import akka.event.Logging.LogLevel
import akka.event.LoggingAdapter
+import kamon.Kamon
trait Logging {
@@ -81,7 +82,7 @@ trait Logging {
}
/**
- * Implementaion of Logging, that uses akka logging.
+ * Implementation of Logging, that uses Akka logging.
*/
class AkkaLogging(loggingAdapter: LoggingAdapter) extends Logging {
def emit(loglevel: LogLevel, id: TransactionId, from: AnyRef, message: String) = {
@@ -156,6 +157,7 @@ private object Logging {
if (simpleName.endsWith("$")) simpleName.dropRight(1)
else simpleName
}
+
}
private object Emitter {
@@ -178,6 +180,23 @@ object LogMarkerToken {
}
}
+object MetricEmitter {
+
+ val metrics = Kamon.metrics
+
+ def emitCounterMetric(token: LogMarkerToken) = {
+ metrics
+ .counter(token.toString)
+ .increment(1)
+ }
+
+ def emitHistogramMetric(token: LogMarkerToken, value: Long) = {
+ metrics
+ .histogram(token.toString)
+ .record(value)
+ }
+}
+
object LoggingMarkers {
val start = "start"
diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala b/common/scala/src/main/scala/whisk/common/TransactionId.scala
index 1e04ce6..2cb99a7 100644
--- a/common/scala/src/main/scala/whisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala
@@ -55,7 +55,17 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
*/
def mark(from: AnyRef, marker: LogMarkerToken, message: String = "", logLevel: LogLevel = InfoLevel)(
implicit logging: Logging) = {
- logging.emit(logLevel, this, from, createMessageWithMarker(message, LogMarker(marker, deltaToStart)))
+
+ if (TransactionId.metricsLog) {
+ logging.emit(logLevel, this, from, createMessageWithMarker(message, LogMarker(marker, deltaToStart)))
+ } else if (message.nonEmpty) {
+ logging.emit(logLevel, this, from, message)
+ }
+
+ if (TransactionId.metricsKamon) {
+ MetricEmitter.emitCounterMetric(marker)
+ }
+
}
/**
@@ -71,7 +81,13 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
*/
def started(from: AnyRef, marker: LogMarkerToken, message: String = "", logLevel: LogLevel = InfoLevel)(
implicit logging: Logging): StartMarker = {
- logging.emit(logLevel, this, from, createMessageWithMarker(message, LogMarker(marker, deltaToStart)))
+
+ if (TransactionId.metricsLog) {
+ logging.emit(logLevel, this, from, createMessageWithMarker(message, LogMarker(marker, deltaToStart)))
+ } else if (message.nonEmpty) {
+ logging.emit(logLevel, this, from, message)
+ }
+
StartMarker(Instant.now, marker)
}
@@ -89,13 +105,24 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
message: String = "",
logLevel: LogLevel = InfoLevel,
endTime: Instant = Instant.now(Clock.systemUTC))(implicit logging: Logging) = {
+
val endMarker =
LogMarkerToken(startMarker.startMarker.component, startMarker.startMarker.action, LoggingMarkers.finish)
- logging.emit(
- logLevel,
- this,
- from,
- createMessageWithMarker(message, LogMarker(endMarker, deltaToStart, Some(deltaToMarker(startMarker, endTime)))))
+ val deltaToEnd = deltaToMarker(startMarker, endTime)
+
+ if (TransactionId.metricsLog) {
+ logging.emit(
+ logLevel,
+ this,
+ from,
+ createMessageWithMarker(message, LogMarker(endMarker, deltaToStart, Some(deltaToEnd))))
+ } else if (message.nonEmpty) {
+ logging.emit(logLevel, this, from, message)
+ }
+
+ if (TransactionId.metricsKamon) {
+ MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
+ }
}
/**
@@ -108,13 +135,24 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
*/
def failed(from: AnyRef, startMarker: StartMarker, message: String = "", logLevel: LogLevel = WarningLevel)(
implicit logging: Logging) = {
+
val endMarker =
LogMarkerToken(startMarker.startMarker.component, startMarker.startMarker.action, LoggingMarkers.error)
- logging.emit(
- logLevel,
- this,
- from,
- createMessageWithMarker(message, LogMarker(endMarker, deltaToStart, Some(deltaToMarker(startMarker)))))
+ val deltaToEnd = deltaToMarker(startMarker)
+
+ if (TransactionId.metricsLog) {
+ logging.emit(
+ logLevel,
+ this,
+ from,
+ createMessageWithMarker(message, LogMarker(endMarker, deltaToStart, Some(deltaToEnd))))
+ } else if (message.nonEmpty) {
+ logging.emit(logLevel, this, from, message)
+ }
+
+ if (TransactionId.metricsKamon) {
+ MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
+ }
}
/**
@@ -158,6 +196,11 @@ case class StartMarker(val start: Instant, startMarker: LogMarkerToken)
protected case class TransactionMetadata(val id: Long, val start: Instant)
object TransactionId {
+
+ // get the metric parameters directly from the environment since WhiskConfig can not be instantiated here
+ val metricsKamon: Boolean = sys.env.get("METRICS_KAMON").getOrElse("False").toBoolean
+ val metricsLog: Boolean = sys.env.get("METRICS_LOG").getOrElse("True").toBoolean
+
val unknown = TransactionId(0)
val testing = TransactionId(-1) // Common id for for unit testing
val invoker = TransactionId(-100) // Invoker startup/shutdown or GC activity
diff --git a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
index f250ddf..33d6295 100644
--- a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
+++ b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
@@ -36,6 +36,7 @@ import whisk.common.LogMarkerToken
import whisk.common.LoggingMarkers
import whisk.common.TransactionCounter
import whisk.common.TransactionId
+import whisk.common.MetricEmitter
/**
* This trait extends the Akka Directives and Actor with logging and transaction counting
@@ -121,7 +122,15 @@ trait BasicHttpService extends Directives with TransactionCounter {
val token = LogMarkerToken("http", s"${m.toLowerCase}.${res.status.intValue}", LoggingMarkers.count)
val marker = LogMarker(token, tid.deltaToStart, Some(tid.deltaToStart))
- Some(LogEntry(s"[$tid] [$name] $marker", l))
+ if (TransactionId.metricsKamon) {
+ MetricEmitter.emitHistogramMetric(token, tid.deltaToStart)
+ }
+ if (TransactionId.metricsLog) {
+ Some(LogEntry(s"[$tid] [$name] $marker", l))
+ } else {
+ None
+ }
+
case _ => None // other kind of responses
}
}
diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf
index d32150c..6e13354 100644
--- a/core/controller/src/main/resources/application.conf
+++ b/core/controller/src/main/resources/application.conf
@@ -81,4 +81,4 @@ akka {
auto-down-unreachable-after = ${?AUTO_DOWN_UNREACHABLE_AFTER}
#distributed-data.notify-subscribers-interval = 0.01
}
-}
+}
\ No newline at end of file
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index fe4c75d..6c8bef5 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -19,15 +19,21 @@ package whisk.core.controller
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
+import scala.concurrent.Future
import scala.util.{Failure, Success}
+import akka.Done
import akka.actor.ActorSystem
+import akka.actor.CoordinatedShutdown
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import spray.json._
+
import spray.json.DefaultJsonProtocol._
+import kamon.Kamon
+
import whisk.common.AkkaLogging
import whisk.common.Logging
import whisk.common.LoggingMarkers
@@ -168,9 +174,17 @@ object Controller {
"runtimes" -> runtimes.toJson)
def main(args: Array[String]): Unit = {
+ Kamon.start()
implicit val actorSystem = ActorSystem("controller-actor-system")
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+ // Prepare Kamon shutdown
+ CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+ logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+ Kamon.shutdown()
+ Future.successful(Done)
+ }
+
// extract configuration data from the environment
val config = new WhiskConfig(requiredProperties)
val port = config.servicePort.toInt
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 0e47bd4..e1fec08 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -19,11 +19,16 @@ package whisk.core.invoker
import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.concurrent.Future
import scala.util.Failure
import com.redis.RedisClient
+import kamon.Kamon
+
+import akka.Done
import akka.actor.ActorSystem
+import akka.actor.CoordinatedShutdown
import akka.stream.ActorMaterializer
import whisk.common.AkkaLogging
import whisk.common.Scheduler
@@ -63,11 +68,20 @@ object Invoker {
Map(invokerName -> "")
def main(args: Array[String]): Unit = {
+ Kamon.start()
+
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+ // Prepare Kamon shutdown
+ CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
+ logger.info(this, s"Shutting down Kamon with coordinated shutdown")
+ Kamon.shutdown()
+ Future.successful(Done)
+ }
+
// load values for the required properties from the environment
implicit val config = new WhiskConfig(requiredProperties)
diff --git a/docs/metrics.md b/docs/metrics.md
new file mode 100644
index 0000000..5d4fcc0
--- /dev/null
+++ b/docs/metrics.md
@@ -0,0 +1,47 @@
+# Openwhisk Metric Support
+
+Openwhick contains the capability to send metric information to a statsd server. This capability is disabled per default. Instead metric information is normally written to the log files in logmarker format.
+
+## Configuration
+
+Both capabilties can be enabled or disabled separately during deployment via Ansible configuration in the 'goup_vars/all' file of an environment.
+
+There are four configurations options available:
+
+- **metrics_log** [true / false (default: true)]
+
+ Enable/disable whether the metric information is written out to the log files in logmarker format.
+
+ *Beware: Even if set to false all messages adjourning the log markers are still written out to the log*
+
+- **metrics_kamon** [true / false (default: false)]
+
+ Enable/disable whther metric information is send the configured statsd server.
+
+- **metrics_kamon_statsd_host** [hostname or ip address]
+
+ Hostname or ip address of the statsd server
+
+- **metrics_kamon_statsd_port** [port number (default:8125)]
+
+ Port number of the statsd server
+
+
+Example configuration:
+
+```
+metrics_kamon: true
+metrics_kamon_statsd_host: '192.168.99.100'
+metrics_kamon_statsd_port: '8125'
+metrics_log: true
+```
+
+## Testing the statsd metric support
+
+The Kamon project privides an integrated docker image containing statsd and a connected Grafana dashboard via [this Github project](https://github.com/kamon-io/docker-grafana-graphite). This image is helpful for testing the metrices sent via statsd.
+
+Please follow these [instructions](https://github.com/kamon-io/docker-grafana-graphite/blob/master/README.md) to start the docker image in your local docker environment.
+
+The docker image exposes statsd via the (standard) port 8125 and a Graphana dashboard via port 8080 on your docker host.
+
+The address of your docker host has to be configured in the `metrics_kamon_statsd_host` configuration property.
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].