You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2022/05/24 17:30:16 UTC
[openwhisk] branch master updated: add fpc load balancer metrics (#5240)
This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 426aef484 add fpc load balancer metrics (#5240)
426aef484 is described below
commit 426aef484d31f863c3345e321f7f39ee4c23b88a
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Tue May 24 10:30:09 2022 -0700
add fpc load balancer metrics (#5240)
Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
.../org/apache/openwhisk/common/Logging.scala | 9 +++++
.../core/loadBalancer/FPCPoolBalancer.scala | 38 +++++++++++++++++++++-
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 5bbf43a70..db39081fe 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -568,6 +568,15 @@ object LoggingMarkers {
val OFFLINE_INVOKER_BLACKBOX =
LogMarkerToken(loadbalancer, "totalOfflineInvokerBlackBox", counter)(MeasurementUnit.none)
+ val HEALTHY_INVOKERS =
+ LogMarkerToken(loadbalancer, "totalHealthyInvoker", counter)(MeasurementUnit.none)
+ val UNHEALTHY_INVOKERS =
+ LogMarkerToken(loadbalancer, "totalUnhealthyInvoker", counter)(MeasurementUnit.none)
+ val OFFLINE_INVOKERS =
+ LogMarkerToken(loadbalancer, "totalOfflineInvoker", counter)(MeasurementUnit.none)
+
+ val INVOKER_TOTALMEM = LogMarkerToken(loadbalancer, "totalCapacity", counter)(MeasurementUnit.none)
+
// Kafka related markers
def KAFKA_QUEUE(topic: String) =
if (TransactionId.metricsKamonTags)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index 361478213..5705099ab 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -3,12 +3,12 @@ package org.apache.openwhisk.core.loadBalancer
import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.LongAdder
-
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Props}
import akka.event.Logging.InfoLevel
import akka.pattern.ask
import akka.util.Timeout
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.controller.Controller
@@ -335,6 +335,16 @@ class FPCPoolBalancer(config: WhiskConfig,
}
}
+ // Singletons for counter metrics related to completion acks
+ protected val LOADBALANCER_COMPLETION_ACK_REGULAR =
+ LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularCompletionAck)
+ protected val LOADBALANCER_COMPLETION_ACK_FORCED =
+ LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedCompletionAck)
+ protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED =
+ LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularAfterForcedCompletionAck)
+ protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR =
+ LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedAfterRegularCompletionAck)
+
/** Process the completion ack and update the state */
protected[loadBalancer] def processCompletion(aid: ActivationId,
tid: TransactionId,
@@ -359,8 +369,10 @@ class FPCPoolBalancer(config: WhiskConfig,
// the active ack is received as expected, and processing that message removed the promise
// from the corresponding map
logging.info(this, s"received completion ack for '$aid', system error=$isSystemError")(tid)
+ MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR)
} else {
logging.error(this, s"Failed to invoke action ${aid.toString}, error: timeout waiting for the active ack")
+ MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)
// the entry has timed out; if the active ack is still around, remove its entry also
// and complete the promise with a failure if necessary
@@ -378,11 +390,13 @@ class FPCPoolBalancer(config: WhiskConfig,
// Logging this condition as a warning because the invoker processed the activation and sent a completion
// message - but not in time.
logging.warn(this, s"received completion ack for '$aid' which has no entry, system error=$isSystemError")(tid)
+ MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
case None =>
// The entry has already been removed by a completion ack. This part of the code is reached by the timeout and can
// happen if completion ack and timeout happen roughly at the same time (the timeout was triggered before the completion
// ack canceled the timer). As the completion ack is already processed we don't have to do anything here.
logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid)
+ MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR)
}
}
@@ -600,6 +614,28 @@ class FPCPoolBalancer(config: WhiskConfig,
}
}
+ def emitMetrics() = {
+ invokerHealth().map(invokers => {
+ MetricEmitter.emitGaugeMetric(HEALTHY_INVOKERS, invokers.count(_.status == Healthy))
+ MetricEmitter.emitGaugeMetric(UNHEALTHY_INVOKERS, invokers.count(_.status == Unhealthy))
+ MetricEmitter.emitGaugeMetric(OFFLINE_INVOKERS, invokers.count(_.status == Offline))
+ // Add both user memory and busy memory because user memory represents free memory in this case
+ MetricEmitter.emitGaugeMetric(
+ INVOKER_TOTALMEM,
+ invokers.foldLeft(0L) { (total, curr) =>
+ if (curr.status.isUsable) {
+ curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
+ } else {
+ total
+ }
+ })
+ MetricEmitter.emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
+ MetricEmitter.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
+ })
+ }
+
+ actorSystem.scheduler.scheduleAtFixedRate(10.seconds, 10.seconds)(() => emitMetrics())
+
/** Gets the number of in-flight activations for a specific user. */
override def activeActivationsFor(namespace: UUID): Future[Int] =
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))