You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/06/15 11:24:26 UTC

[openwhisk] branch master updated: rework scheduler wait time metric (#5258)

This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 052d4d218 rework scheduler wait time metric (#5258)
052d4d218 is described below

commit 052d4d21889b38808f96958f553eeaa73947a553
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Wed Jun 15 04:24:18 2022 -0700

    rework scheduler wait time metric (#5258)
    
    Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
 .../scala/org/apache/openwhisk/common/Logging.scala  |  6 ++++--
 .../openwhisk/core/scheduler/queue/MemoryQueue.scala | 20 ++++++++++++--------
 .../core/scheduler/queue/QueueManager.scala          |  2 +-
 3 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 5a1fe89f0..c0dbb783a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -591,8 +591,9 @@ object LoggingMarkers {
 
   // Time that is needed to produce message in kafka
   val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
-  val SCHEDULER_WAIT_TIME =
-    LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none)
+  val SCHEDULER_KAFKA_WAIT_TIME =
+    LogMarkerToken(scheduler, "kafkaWaitTime", counter)(MeasurementUnit.none)
+  def SCHEDULER_WAIT_TIME(action: String) = LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
 
   def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
     LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
@@ -603,6 +604,7 @@ object LoggingMarkers {
     LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
   def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
     LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
+
   /*
    * General markers
    */
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index d54e697e2..51a674457 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -19,7 +19,6 @@ package org.apache.openwhisk.core.scheduler.queue
 
 import java.time.{Duration, Instant}
 import java.util.concurrent.atomic.AtomicInteger
-
 import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
 import akka.util.Timeout
@@ -28,6 +27,7 @@ import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
 import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool.Interval
 import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
@@ -35,12 +35,7 @@ import org.apache.openwhisk.core.etcd.EtcdClient
 import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
 import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
 import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
-import org.apache.openwhisk.core.scheduler.message.{
-  ContainerCreation,
-  ContainerDeletion,
-  FailedCreationJob,
-  SuccessfulCreationJob
-}
+import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
 import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
 import org.apache.openwhisk.core.service._
 import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
@@ -52,7 +47,7 @@ import scala.annotation.tailrec
 import scala.collection.immutable.Queue
 import scala.collection.mutable
 import scala.concurrent.duration._
-import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration}
 import scala.util.{Failure, Success}
 
 // States
@@ -769,6 +764,10 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       this,
       s"[$invocationNamespace:$action:$stateName] complete activation ${activation.activationId} with error $message")(
       activation.transid)
+
+    val totalTimeInScheduler = Interval(activation.transid.meta.start, Instant.now()).duration
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
+
     val activationResponse =
       if (isWhiskError)
         generateFallbackActivation(activation, ActivationResponse.whiskError(message))
@@ -938,6 +937,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
     in.incrementAndGet()
     takeUncompletedRequest()
       .map { res =>
+        val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
+        MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
         res.trySuccess(Right(msg))
         in.decrementAndGet()
         stay
@@ -958,6 +959,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       logging.info(
         this,
         s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")
+      val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
+      MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
+
       sender ! GetActivationResponse(Right(msg))
       tryDisableActionThrottling()
     } else {
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index 6dbaffb0b..5178d2a3f 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -391,7 +391,7 @@ class QueueManager(
 
     // Drop the message that has not been scheduled for a long time
     val schedulingWaitTime = Interval(msg.transid.meta.start, Instant.now()).duration
-    MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME, schedulingWaitTime.toMillis)
+    MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_KAFKA_WAIT_TIME, schedulingWaitTime.toMillis)
 
     if (schedulingWaitTime > queueManagerConfig.maxSchedulingTime) {
       logging.warn(