You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2023/03/07 18:17:42 UTC

[openwhisk] branch master updated: Add Scheduler Queue Metric for Not Processing Any Activations (#5386)

This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 60ca6605b Add Scheduler Queue Metric for Not Processing Any Activations (#5386)
60ca6605b is described below

commit 60ca6605bb081f99906cff1a21caf75d47e414fa
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Tue Mar 7 10:17:34 2023 -0800

    Add Scheduler Queue Metric for Not Processing Any Activations (#5386)
    
    * Add Scheduler Queue Metric for Not Processing Any Activations
    
    * fix timeout comparison
    
    * account for action timeout being longer than queue retention
    
    ---------
    
    Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
 .../scala/org/apache/openwhisk/common/Logging.scala   |  7 +++++++
 .../openwhisk/core/scheduler/queue/MemoryQueue.scala  | 19 ++++++++++++++++---
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 2bc3b1c81..c2c4bbf59 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -612,6 +612,13 @@ object LoggingMarkers {
       counter,
       Some(actionWithoutVersion),
       Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)
+  def SCHEDULER_QUEUE_NOT_PROCESSING(namespace: String, actionWithVersion: String, actionWithoutVersion: String) =
+    LogMarkerToken(
+      scheduler,
+      "queueNotProcessing",
+      counter,
+      Some(actionWithoutVersion),
+      Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)
 
   /*
    * General markers
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 8602702dd..d21a6104d 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -46,10 +46,10 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
 import pureconfig.loadConfigOrThrow
 import spray.json._
 import pureconfig.generic.auto._
-import scala.collection.JavaConverters._
 
+import scala.collection.JavaConverters._
 import java.time.{Duration, Instant}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import scala.annotation.tailrec
 import scala.collection.immutable.Queue
 import scala.collection.mutable
@@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
                   checkToDropStaleActivation: (Clock,
                                                Queue[TimeSeriesActivationEntry],
                                                Long,
+                                               AtomicLong,
                                                String,
                                                WhiskActionMetaData,
                                                MemoryQueueState,
@@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
 
   private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
   private[queue] var in = new AtomicInteger(0)
+  private[queue] val lastActivationPulledTime = new AtomicLong(Instant.now.toEpochMilli)
   private[queue] val namespaceContainerCount = NamespaceContainerCount(invocationNamespace, etcdClient, watcherService)
   private[queue] var averageDuration: Option[Double] = None
   private[queue] var averageDurationBuffer = AverageRingBuffer(queueConfig.durationBufferSize)
@@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
     case Event(DropOld, _) =>
       if (queue.nonEmpty && Duration
             .between(queue.head.timestamp, clock.now())
-            .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
+            .compareTo(Duration.ofMillis(actionRetentionTimeout)) >= 0) {
         logging.error(
           this,
           s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.")
@@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
         clock,
         queue,
         actionRetentionTimeout,
+        lastActivationPulledTime,
         invocationNamespace,
         actionMetaData,
         stateName,
@@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
         MetricEmitter.emitHistogramMetric(
           LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
           totalTimeInScheduler.toMillis)
+        lastActivationPulledTime.set(Instant.now.toEpochMilli)
         res.trySuccess(Right(msg))
         in.decrementAndGet()
         stay
@@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       MetricEmitter.emitHistogramMetric(
         LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
         totalTimeInScheduler.toMillis)
+      lastActivationPulledTime.set(Instant.now.toEpochMilli)
 
       sender ! GetActivationResponse(Right(msg))
       tryDisableActionThrottling()
@@ -1186,6 +1191,7 @@ object MemoryQueue {
   def checkToDropStaleActivation(clock: Clock,
                                  queue: Queue[TimeSeriesActivationEntry],
                                  maxRetentionMs: Long,
+                                 lastActivationExecutedTime: AtomicLong,
                                  invocationNamespace: String,
                                  actionMetaData: WhiskActionMetaData,
                                  stateName: MemoryQueueState,
@@ -1201,6 +1207,13 @@ object MemoryQueue {
       logging.info(
         this,
         s"[$invocationNamespace:$action:$stateName] some activations are stale msg: ${queue.head.msg.activationId}.")
+      val timeSinceLastActivationGrabbed = clock.now().toEpochMilli - lastActivationExecutedTime.get()
+      if (timeSinceLastActivationGrabbed > maxRetentionMs && timeSinceLastActivationGrabbed > actionMetaData.limits.timeout.millis) {
+        MetricEmitter.emitGaugeMetric(
+          LoggingMarkers
+            .SCHEDULER_QUEUE_NOT_PROCESSING(invocationNamespace, action.asString, action.toStringWithoutVersion),
+          1)
+      }
 
       queueRef ! DropOld
     }