You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/17 06:54:21 UTC

[GitHub] markusthoemmes closed pull request #3180: Emit metrics relevant for the usage of the system

markusthoemmes closed pull request #3180: Emit metrics relevant for the usage of the system
URL: https://github.com/apache/incubator-openwhisk/pull/3180
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index e63c824f30..06078ba545 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -22,12 +22,15 @@ import java.time.Clock
 import java.time.Instant
 import java.time.ZoneId
 import java.time.format.DateTimeFormatter
+import java.util.concurrent.atomic.AtomicLong
 
 import akka.event.Logging.{DebugLevel, ErrorLevel, InfoLevel, WarningLevel}
 import akka.event.Logging.LogLevel
 import akka.event.LoggingAdapter
 import kamon.Kamon
 
+import scala.concurrent.duration._
+
 trait Logging {
 
   /**
@@ -165,10 +168,11 @@ private object Emitter {
 }
 
 case class LogMarkerToken(component: String, action: String, state: String) {
-  override def toString() = component + "_" + action + "_" + state
+  override def toString: String = asString
+  def asString: String = component + "_" + action + "_" + state
 
-  def asFinish = copy(state = LoggingMarkers.finish)
-  def asError = copy(state = LoggingMarkers.error)
+  def asFinish: LogMarkerToken = copy(state = LoggingMarkers.finish)
+  def asError: LogMarkerToken = copy(state = LoggingMarkers.error)
 }
 
 object LogMarkerToken {
@@ -184,16 +188,22 @@ object MetricEmitter {
 
   val metrics = Kamon.metrics
 
-  def emitCounterMetric(token: LogMarkerToken) = {
-    metrics
-      .counter(token.toString)
-      .increment(1)
-  }
+  def incrementCounter(token: LogMarkerToken): Unit = metrics.counter(token.asString).increment(1)
+  def emitHistogramMetric(token: LogMarkerToken, value: Long): Unit = metrics.histogram(token.asString).record(value)
+
+  /**
+   * Creating a gauge to record values to.
+   *
+   * Uses a StableGauge which will always report the same value until that value is changed.
+   *
+   * @param token name of the gauge
+   * @return a stable gauge to record values to
+   */
+  def setupGauge(token: LogMarkerToken) = new StableGauge(token.asString)
+  class StableGauge(name: String, value: AtomicLong = new AtomicLong(0)) {
+    metrics.gauge(name, 1.second)(value.get)
 
-  def emitHistogramMetric(token: LogMarkerToken, value: Long) = {
-    metrics
-      .histogram(token.toString)
-      .record(value)
+    def record(newValue: Long): Unit = value.set(newValue)
   }
 }
 
@@ -234,6 +244,9 @@ object LoggingMarkers {
   // Check invoker healthy state from loadbalancer
   val LOADBALANCER_INVOKER_OFFLINE = LogMarkerToken(loadbalancer, "invokerOffline", count)
   val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
+  val LOADBALANCER_INVOKER_HEALTHY = LogMarkerToken(loadbalancer, "invokerHealthy", count)
+
+  val LOADBALANCER_ACTIVATIONS_INFLIGHT = LogMarkerToken(loadbalancer, "activationsInFlight", count)
 
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala b/common/scala/src/main/scala/whisk/common/TransactionId.scala
index 95e6eefc09..bf907162a7 100644
--- a/common/scala/src/main/scala/whisk/common/TransactionId.scala
+++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala
@@ -63,7 +63,7 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
     }
 
     if (TransactionId.metricsKamon) {
-      MetricEmitter.emitCounterMetric(marker)
+      MetricEmitter.incrementCounter(marker)
     }
 
   }
@@ -89,7 +89,7 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
     }
 
     if (TransactionId.metricsKamon) {
-      MetricEmitter.emitCounterMetric(marker)
+      MetricEmitter.incrementCounter(marker)
     }
 
     StartMarker(Instant.now, marker)
@@ -156,7 +156,7 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
 
     if (TransactionId.metricsKamon) {
       MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd)
-      MetricEmitter.emitCounterMetric(endMarker)
+      MetricEmitter.incrementCounter(endMarker)
     }
   }
 
diff --git a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
index 798bb1eff8..379ddebd0a 100644
--- a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
+++ b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala
@@ -114,7 +114,7 @@ trait BasicHttpService extends Directives with TransactionCounter {
 
       if (TransactionId.metricsKamon) {
         MetricEmitter.emitHistogramMetric(token, tid.deltaToStart)
-        MetricEmitter.emitCounterMetric(token)
+        MetricEmitter.incrementCounter(token)
       }
       if (TransactionId.metricsLog) {
         Some(LogEntry(s"[$tid] [$name] $marker", l))
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 6256af2a5b..9e2192556a 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -17,8 +17,7 @@
 
 package whisk.core.entitlement
 
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.entity.Identity
 import whisk.core.loadBalancer.LoadBalancer
 import whisk.http.Messages
@@ -55,11 +54,14 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
     }
   }
 
+  private val inflightActivationsReporter = MetricEmitter.setupGauge(LoggingMarkers.LOADBALANCER_ACTIVATIONS_INFLIGHT)
+
   /**
    * Checks whether the system is in a generally overloaded state.
    */
   def isOverloaded()(implicit tid: TransactionId): Future[Boolean] = {
     loadBalancer.totalActiveActivations.map { concurrentActivations =>
+      inflightActivationsReporter.record(concurrentActivations)
       logging.info(
         this,
         s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index cd80a8ee12..cb3259efdb 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -35,10 +35,7 @@ import akka.actor.FSM.Transition
 import akka.actor.Props
 import akka.pattern.pipe
 import akka.util.Timeout
-import whisk.common.AkkaLogging
-import whisk.common.LoggingMarkers
-import whisk.common.RingBuffer
-import whisk.common.TransactionId
+import whisk.common._
 import whisk.core.connector._
 import whisk.core.entitlement.Privilege
 import whisk.core.entity.ActivationId.ActivationIdGenerator
@@ -117,7 +114,15 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
     case msg: ActivationRequest => sendActivationToInvoker(msg.msg, msg.invoker).pipeTo(sender)
   }
 
-  def logStatus() = {
+  val healthyInvokers = MetricEmitter.setupGauge(LoggingMarkers.LOADBALANCER_INVOKER_HEALTHY)
+  val unhealthyInvokers = MetricEmitter.setupGauge(LoggingMarkers.LOADBALANCER_INVOKER_UNHEALTHY)
+  val offlineInvokers = MetricEmitter.setupGauge(LoggingMarkers.LOADBALANCER_INVOKER_OFFLINE)
+
+  def logStatus(): Unit = {
+    healthyInvokers.record(status.count(_._2 == Healthy).toLong)
+    unhealthyInvokers.record(status.count(_._2 == UnHealthy).toLong)
+    offlineInvokers.record(status.count(_._2 == Offline).toLong)
+
     val pretty = status.map { case (instance, state) => s"${instance.toInt} -> $state" }
     logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
   }
@@ -254,19 +259,9 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId)
 
   /** Logging on Transition change */
   onTransition {
-    case _ -> Offline =>
-      transid.mark(
-        this,
-        LoggingMarkers.LOADBALANCER_INVOKER_OFFLINE,
-        s"$name is offline",
-        akka.event.Logging.WarningLevel)
-    case _ -> UnHealthy =>
-      transid.mark(
-        this,
-        LoggingMarkers.LOADBALANCER_INVOKER_UNHEALTHY,
-        s"$name is unhealthy",
-        akka.event.Logging.WarningLevel)
-    case _ -> Healthy => logging.info(this, s"$name is healthy")
+    case _ -> Offline   => logging.warn(this, s"$name is offline")
+    case _ -> UnHealthy => logging.warn(this, s"$name is unhealthy")
+    case _ -> Healthy   => logging.info(this, s"$name is healthy")
   }
 
   /** Scheduler to send test activations when the invoker is unhealthy. */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services