You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/07 14:47:20 UTC
[GitHub] jeremiaswerner closed pull request #2968: add additional metrics and logs
jeremiaswerner closed pull request #2968: add additional metrics and logs
URL: https://github.com/apache/incubator-openwhisk/pull/2968
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index 873bcef81b..1902e83328 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -188,17 +188,18 @@ object MetricEmitter {
val metrics = Kamon.metrics
- def emitCounterMetric(token: LogMarkerToken) = {
+ def emitCounterMetric(token: LogMarkerToken): Unit = {
metrics
.counter(token.toString)
.increment(1)
}
- def emitHistogramMetric(token: LogMarkerToken, value: Long) = {
+ def emitHistogramMetric(token: LogMarkerToken, value: Long): Unit = {
metrics
.histogram(token.toString)
.record(value)
}
+
}
object LoggingMarkers {
@@ -238,6 +239,8 @@ object LoggingMarkers {
// Check invoker healthy state from loadbalancer
val LOADBALANCER_INVOKER_OFFLINE = LogMarkerToken(loadbalancer, "invokerOffline", count)
val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
+ def LOADBALANCER_ACTIVATION_START(namespaceId: String) =
+ LogMarkerToken(loadbalancer, s"activations_$namespaceId", count)
// Time that is needed to execute the action
val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
@@ -252,6 +255,8 @@ object LoggingMarkers {
val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, start)
def INVOKER_DOCKER_CMD(cmd: String) = LogMarkerToken(invoker, s"docker.$cmd", start)
def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, s"runc.$cmd", start)
+ def INVOKER_CONTAINER_START(actionName: String, namespaceName: String, containerState: String) =
+ LogMarkerToken(invoker, s"container_start_${containerState}_${namespaceName}_$actionName", count)
/*
* General markers
@@ -265,4 +270,5 @@ object LoggingMarkers {
val DATABASE_QUERY = LogMarkerToken(database, "queryView", start)
val DATABASE_ATT_GET = LogMarkerToken(database, "getDocumentAttachment", start)
val DATABASE_ATT_SAVE = LogMarkerToken(database, "saveDocumentAttachment", start)
+ val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", count)
}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index 6832bc30c1..efc03cca85 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -20,7 +20,6 @@ package whisk.core.database
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
-
import akka.actor.ActorSystem
import akka.event.Logging.ErrorLevel
import akka.http.scaladsl.model._
@@ -28,9 +27,7 @@ import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import spray.json._
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
+import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import whisk.core.entity.BulkEntityResult
import whisk.core.entity.DocInfo
import whisk.core.entity.DocRevision
@@ -141,6 +138,8 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
val count = ds.size
val start = transid.started(this, LoggingMarkers.DATABASE_BULK_SAVE, s"'$dbName' saving $count documents")
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.DATABASE_BATCH_SIZE, ds.size)
+
val f = client.putDocs(ds).map {
_ match {
case Right(response) =>
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index f2614e0ca2..074b370c97 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -36,6 +36,8 @@ import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.spi.SpiLoader
import akka.event.Logging.InfoLevel
+import pureconfig._
+
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
@@ -158,6 +160,12 @@ class ContainerPoolBalancer(config: WhiskConfig, instance: InstanceId)(implicit
processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
}
+ transid.mark(
+ this,
+ LoggingMarkers.LOADBALANCER_ACTIVATION_START(namespaceId.asString),
+ s"loadbalancer: activation started for namespace $namespaceId and activation $activationId",
+ logLevel = InfoLevel)
+
// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
ActivationEntry(
activationId,
@@ -203,8 +211,7 @@ class ContainerPoolBalancer(config: WhiskConfig, instance: InstanceId)(implicit
val start = transid.started(
this,
LoggingMarkers.CONTROLLER_KAFKA,
- s"posting topic '$topic' with activation id '${msg.activationId}'",
- logLevel = InfoLevel)
+ s"posting topic '$topic' with activation id '${msg.activationId}'")
producer.send(topic, msg).andThen {
case Success(status) =>
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 3e83fcac5a..d0945ab859 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -18,12 +18,11 @@
package whisk.core.containerpool
import scala.collection.immutable
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.ActorRefFactory
-import akka.actor.Props
-import whisk.common.AkkaLogging
-import whisk.common.TransactionId
+
+import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+
import whisk.core.entity.ByteSize
import whisk.core.entity.CodeExec
import whisk.core.entity.EntityName
@@ -80,6 +79,18 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
}
+ def logContainerStart(r: Run, containerState: String): Unit = {
+ val namespaceName = r.msg.user.namespace.name
+ val actionName = r.action.name.name
+ val activationId = r.msg.activationId.toString
+
+ r.msg.transid.mark(
+ this,
+ LoggingMarkers.INVOKER_CONTAINER_START(actionName, namespaceName, containerState),
+ s"containerStart containerState: $containerState action: $actionName namespace: $namespaceName activationId: $activationId",
+ akka.event.Logging.InfoLevel)
+ }
+
def receive: Receive = {
// A job to run on a container
//
@@ -87,33 +98,46 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
// fail for example, or a container has aged and was destroying itself when a new request was assigned)
case r: Run =>
- val container = if (busyPool.size < maxActiveContainers) {
+ val createdContainer = if (busyPool.size < maxActiveContainers) {
+
// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace, freePool)
+ .map(container => {
+ (container, "warm")
+ })
.orElse {
if (busyPool.size + freePool.size < maxPoolSize) {
- takePrewarmContainer(r.action).orElse {
- Some(createContainer())
- }
+ takePrewarmContainer(r.action)
+ .map(container => {
+ (container, "prewarmed")
+ })
+ .orElse {
+ Some(createContainer(), "cold")
+ }
} else None
}
.orElse {
// Remove a container and create a new one for the given job
ContainerPool.remove(freePool).map { toDelete =>
removeContainer(toDelete)
- takePrewarmContainer(r.action).getOrElse {
- createContainer()
- }
+ takePrewarmContainer(r.action)
+ .map(container => {
+ (container, "recreated")
+ })
+ .getOrElse {
+ (createContainer(), "recreated")
+ }
}
}
} else None
- container match {
- case Some((actor, data)) =>
+ createdContainer match {
+ case Some(((actor, data), containerState)) =>
busyPool = busyPool + (actor -> data)
freePool = freePool - actor
actor ! r // forwards the run request to the container
+ logContainerStart(r, containerState)
case None =>
// this can also happen if createContainer fails to start a new container, or
// if a job is rescheduled but the container it was allocated to has not yet destroyed itself
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services