You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2023/03/07 18:17:42 UTC
[openwhisk] branch master updated: Add Scheduler Queue Metric for Not Processing Any Activations (#5386)
This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 60ca6605b Add Scheduler Queue Metric for Not Processing Any Activations (#5386)
60ca6605b is described below
commit 60ca6605bb081f99906cff1a21caf75d47e414fa
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Tue Mar 7 10:17:34 2023 -0800
Add Scheduler Queue Metric for Not Processing Any Activations (#5386)
* Add Scheduler Queue Metric for Not Processing Any Activations
* fix timeout comparison
* account for action timeout being longer than queue retention
---------
Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
.../scala/org/apache/openwhisk/common/Logging.scala | 7 +++++++
.../openwhisk/core/scheduler/queue/MemoryQueue.scala | 19 ++++++++++++++++---
2 files changed, 23 insertions(+), 3 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 2bc3b1c81..c2c4bbf59 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -612,6 +612,13 @@ object LoggingMarkers {
counter,
Some(actionWithoutVersion),
Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)
+ def SCHEDULER_QUEUE_NOT_PROCESSING(namespace: String, actionWithVersion: String, actionWithoutVersion: String) =
+ LogMarkerToken(
+ scheduler,
+ "queueNotProcessing",
+ counter,
+ Some(actionWithoutVersion),
+ Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none)
/*
* General markers
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 8602702dd..d21a6104d 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -46,10 +46,10 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
import pureconfig.loadConfigOrThrow
import spray.json._
import pureconfig.generic.auto._
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters._
import java.time.{Duration, Instant}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
@@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
checkToDropStaleActivation: (Clock,
Queue[TimeSeriesActivationEntry],
Long,
+ AtomicLong,
String,
WhiskActionMetaData,
MemoryQueueState,
@@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
private[queue] var in = new AtomicInteger(0)
+ private[queue] val lastActivationPulledTime = new AtomicLong(Instant.now.toEpochMilli)
private[queue] val namespaceContainerCount = NamespaceContainerCount(invocationNamespace, etcdClient, watcherService)
private[queue] var averageDuration: Option[Double] = None
private[queue] var averageDurationBuffer = AverageRingBuffer(queueConfig.durationBufferSize)
@@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
case Event(DropOld, _) =>
if (queue.nonEmpty && Duration
.between(queue.head.timestamp, clock.now())
- .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
+ .compareTo(Duration.ofMillis(actionRetentionTimeout)) >= 0) {
logging.error(
this,
s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.")
@@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
clock,
queue,
actionRetentionTimeout,
+ lastActivationPulledTime,
invocationNamespace,
actionMetaData,
stateName,
@@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
totalTimeInScheduler.toMillis)
+ lastActivationPulledTime.set(Instant.now.toEpochMilli)
res.trySuccess(Right(msg))
in.decrementAndGet()
stay
@@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion),
totalTimeInScheduler.toMillis)
+ lastActivationPulledTime.set(Instant.now.toEpochMilli)
sender ! GetActivationResponse(Right(msg))
tryDisableActionThrottling()
@@ -1186,6 +1191,7 @@ object MemoryQueue {
def checkToDropStaleActivation(clock: Clock,
queue: Queue[TimeSeriesActivationEntry],
maxRetentionMs: Long,
+ lastActivationExecutedTime: AtomicLong,
invocationNamespace: String,
actionMetaData: WhiskActionMetaData,
stateName: MemoryQueueState,
@@ -1201,6 +1207,13 @@ object MemoryQueue {
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] some activations are stale msg: ${queue.head.msg.activationId}.")
+ val timeSinceLastActivationGrabbed = clock.now().toEpochMilli - lastActivationExecutedTime.get()
+ if (timeSinceLastActivationGrabbed > maxRetentionMs && timeSinceLastActivationGrabbed > actionMetaData.limits.timeout.millis) {
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers
+ .SCHEDULER_QUEUE_NOT_PROCESSING(invocationNamespace, action.asString, action.toStringWithoutVersion),
+ 1)
+ }
queueRef ! DropOld
}