You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/07 14:47:20 UTC

[GitHub] jeremiaswerner closed pull request #2968: add additional metrics and logs

jeremiaswerner closed pull request #2968: add additional metrics and logs
URL: https://github.com/apache/incubator-openwhisk/pull/2968
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 873bcef81b..1902e83328 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -188,17 +188,18 @@ object MetricEmitter {
 
   val metrics = Kamon.metrics
 
-  def emitCounterMetric(token: LogMarkerToken) = {
+  def emitCounterMetric(token: LogMarkerToken): Unit = {
     metrics
       .counter(token.toString)
       .increment(1)
   }
 
-  def emitHistogramMetric(token: LogMarkerToken, value: Long) = {
+  def emitHistogramMetric(token: LogMarkerToken, value: Long): Unit = {
     metrics
       .histogram(token.toString)
       .record(value)
   }
+
 }
 
 object LoggingMarkers {
@@ -238,6 +239,8 @@ object LoggingMarkers {
   // Check invoker healthy state from loadbalancer
   val LOADBALANCER_INVOKER_OFFLINE = LogMarkerToken(loadbalancer, "invokerOffline", count)
   val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
+  def LOADBALANCER_ACTIVATION_START(namespaceId: String) =
+    LogMarkerToken(loadbalancer, s"activations_$namespaceId", count)
 
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
@@ -252,6 +255,8 @@ object LoggingMarkers {
   val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, start)
   def INVOKER_DOCKER_CMD(cmd: String) = LogMarkerToken(invoker, s"docker.$cmd", start)
   def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, s"runc.$cmd", start)
+  def INVOKER_CONTAINER_START(actionName: String, namespaceName: String, containerState: String) =
+    LogMarkerToken(invoker, s"container_start_${containerState}_${namespaceName}_$actionName", count)
 
   /*
    * General markers
@@ -265,4 +270,5 @@ object LoggingMarkers {
   val DATABASE_QUERY = LogMarkerToken(database, "queryView", start)
   val DATABASE_ATT_GET = LogMarkerToken(database, "getDocumentAttachment", start)
   val DATABASE_ATT_SAVE = LogMarkerToken(database, "saveDocumentAttachment", start)
+  val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", count)
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 6832bc30c1..efc03cca85 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -20,7 +20,6 @@ package whisk.core.database
 import scala.concurrent.Await
 import scala.concurrent.Future
 import scala.concurrent.duration._
-
 import akka.actor.ActorSystem
 import akka.event.Logging.ErrorLevel
 import akka.http.scaladsl.model._
@@ -28,9 +27,7 @@ import akka.stream.ActorMaterializer
 import akka.stream.scaladsl._
 import akka.util.ByteString
 import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
 import whisk.core.entity.BulkEntityResult
 import whisk.core.entity.DocInfo
 import whisk.core.entity.DocRevision
@@ -141,6 +138,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     val count = ds.size
     val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'$dbName' saving $count documents")
 
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.DATABASE_BATCH_SIZE, ds.size)
+
     val f = client.putDocs(ds).map {
       _ match {
         case Right(response) =>
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index f2614e0ca2..074b370c97 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -36,6 +36,8 @@ import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.spi.SpiLoader
 import akka.event.Logging.InfoLevel
 
+import pureconfig._
+
 import scala.annotation.tailrec
 import scala.concurrent.duration._
 import scala.concurrent.{Await, ExecutionContext, Future, Promise}
@@ -158,6 +160,12 @@ class ContainerPoolBalancer(config: WhiskConfig, instance: InstanceId)(implicit
           processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
         }
 
+        transid.mark(
+          this,
+          LoggingMarkers.LOADBALANCER_ACTIVATION_START(namespaceId.asString),
+          s"loadbalancer: activation started for namespace $namespaceId and activation $activationId",
+          logLevel = InfoLevel)
+
         // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
         ActivationEntry(
           activationId,
@@ -203,8 +211,7 @@ class ContainerPoolBalancer(config: WhiskConfig, instance: InstanceId)(implicit
     val start = transid.started(
       this,
       LoggingMarkers.CONTROLLER_KAFKA,
-      s"posting topic '$topic' with activation id '${msg.activationId}'",
-      logLevel = InfoLevel)
+      s"posting topic '$topic' with activation id '${msg.activationId}'")
 
     producer.send(topic, msg).andThen {
       case Success(status) =>
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 3e83fcac5a..d0945ab859 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -18,12 +18,11 @@
 package whisk.core.containerpool
 
 import scala.collection.immutable
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.ActorRefFactory
-import akka.actor.Props
-import whisk.common.AkkaLogging
-import whisk.common.TransactionId
+
+import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+
 import whisk.core.entity.ByteSize
 import whisk.core.entity.CodeExec
 import whisk.core.entity.EntityName
@@ -80,6 +79,18 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     }
   }
 
+  def logContainerStart(r: Run, containerState: String): Unit = {
+    val namespaceName = r.msg.user.namespace.name
+    val actionName = r.action.name.name
+    val activationId = r.msg.activationId.toString
+
+    r.msg.transid.mark(
+      this,
+      LoggingMarkers.INVOKER_CONTAINER_START(actionName, namespaceName, containerState),
+      s"containerStart containerState: $containerState action: $actionName namespace: $namespaceName activationId: $activationId",
+      akka.event.Logging.InfoLevel)
+  }
+
   def receive: Receive = {
     // A job to run on a container
     //
@@ -87,33 +98,46 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
     // fail for example, or a container has aged and was destroying itself when a new request was assigned)
     case r: Run =>
-      val container = if (busyPool.size < maxActiveContainers) {
+      val createdContainer = if (busyPool.size < maxActiveContainers) {
+
         // Schedule a job to a warm container
         ContainerPool
           .schedule(r.action, r.msg.user.namespace, freePool)
+          .map(container => {
+            (container, "warm")
+          })
           .orElse {
             if (busyPool.size + freePool.size < maxPoolSize) {
-              takePrewarmContainer(r.action).orElse {
-                Some(createContainer())
-              }
+              takePrewarmContainer(r.action)
+                .map(container => {
+                  (container, "prewarmed")
+                })
+                .orElse {
+                  Some(createContainer(), "cold")
+                }
             } else None
           }
           .orElse {
             // Remove a container and create a new one for the given job
             ContainerPool.remove(freePool).map { toDelete =>
               removeContainer(toDelete)
-              takePrewarmContainer(r.action).getOrElse {
-                createContainer()
-              }
+              takePrewarmContainer(r.action)
+                .map(container => {
+                  (container, "recreated")
+                })
+                .getOrElse {
+                  (createContainer(), "recreated")
+                }
             }
           }
       } else None
 
-      container match {
-        case Some((actor, data)) =>
+      createdContainer match {
+        case Some(((actor, data), containerState)) =>
           busyPool = busyPool + (actor -> data)
           freePool = freePool - actor
           actor ! r // forwards the run request to the container
+          logContainerStart(r, containerState)
         case None =>
           // this can also happen if createContainer fails to start a new container, or
           // if a job is rescheduled but the container it was allocated to has not yet destroyed itself


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services