You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by st...@apache.org on 2022/06/15 11:24:26 UTC
[openwhisk] branch master updated: rework scheduler wait time metric (#5258)
This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 052d4d218 rework scheduler wait time metric (#5258)
052d4d218 is described below
commit 052d4d21889b38808f96958f553eeaa73947a553
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Wed Jun 15 04:24:18 2022 -0700
rework scheduler wait time metric (#5258)
Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
.../scala/org/apache/openwhisk/common/Logging.scala | 6 ++++--
.../openwhisk/core/scheduler/queue/MemoryQueue.scala | 20 ++++++++++++--------
.../core/scheduler/queue/QueueManager.scala | 2 +-
3 files changed, 17 insertions(+), 11 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 5a1fe89f0..c0dbb783a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -591,8 +591,9 @@ object LoggingMarkers {
// Time that is needed to produce message in kafka
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
- val SCHEDULER_WAIT_TIME =
- LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none)
+ val SCHEDULER_KAFKA_WAIT_TIME =
+ LogMarkerToken(scheduler, "kafkaWaitTime", counter)(MeasurementUnit.none)
+ def SCHEDULER_WAIT_TIME(action: String) = LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
@@ -603,6 +604,7 @@ object LoggingMarkers {
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
+
/*
* General markers
*/
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index d54e697e2..51a674457 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -19,7 +19,6 @@ package org.apache.openwhisk.core.scheduler.queue
import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicInteger
-
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
import akka.util.Timeout
@@ -28,6 +27,7 @@ import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.containerpool.Interval
import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
@@ -35,12 +35,7 @@ import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
-import org.apache.openwhisk.core.scheduler.message.{
- ContainerCreation,
- ContainerDeletion,
- FailedCreationJob,
- SuccessfulCreationJob
-}
+import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
import org.apache.openwhisk.core.service._
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
@@ -52,7 +47,7 @@ import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.duration._
-import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration}
import scala.util.{Failure, Success}
// States
@@ -769,6 +764,10 @@ class MemoryQueue(private val etcdClient: EtcdClient,
this,
s"[$invocationNamespace:$action:$stateName] complete activation ${activation.activationId} with error $message")(
activation.transid)
+
+ val totalTimeInScheduler = Interval(activation.transid.meta.start, Instant.now()).duration
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
+
val activationResponse =
if (isWhiskError)
generateFallbackActivation(activation, ActivationResponse.whiskError(message))
@@ -938,6 +937,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
in.incrementAndGet()
takeUncompletedRequest()
.map { res =>
+ val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
res.trySuccess(Right(msg))
in.decrementAndGet()
stay
@@ -958,6 +959,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")
+ val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
+
sender ! GetActivationResponse(Right(msg))
tryDisableActionThrottling()
} else {
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index 6dbaffb0b..5178d2a3f 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -391,7 +391,7 @@ class QueueManager(
// Drop the message that has not been scheduled for a long time
val schedulingWaitTime = Interval(msg.transid.meta.start, Instant.now()).duration
- MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME, schedulingWaitTime.toMillis)
+ MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_KAFKA_WAIT_TIME, schedulingWaitTime.toMillis)
if (schedulingWaitTime > queueManagerConfig.maxSchedulingTime) {
logging.warn(