You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2022/05/24 17:30:16 UTC

[openwhisk] branch master updated: add fpc load balancer metrics (#5240)

This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 426aef484 add fpc load balancer metrics (#5240)
426aef484 is described below

commit 426aef484d31f863c3345e321f7f39ee4c23b88a
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Tue May 24 10:30:09 2022 -0700

    add fpc load balancer metrics (#5240)
    
    Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
 .../org/apache/openwhisk/common/Logging.scala      |  9 +++++
 .../core/loadBalancer/FPCPoolBalancer.scala        | 38 +++++++++++++++++++++-
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 5bbf43a70..db39081fe 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -568,6 +568,15 @@ object LoggingMarkers {
   val OFFLINE_INVOKER_BLACKBOX =
     LogMarkerToken(loadbalancer, "totalOfflineInvokerBlackBox", counter)(MeasurementUnit.none)
 
+  val HEALTHY_INVOKERS =
+    LogMarkerToken(loadbalancer, "totalHealthyInvoker", counter)(MeasurementUnit.none)
+  val UNHEALTHY_INVOKERS =
+    LogMarkerToken(loadbalancer, "totalUnhealthyInvoker", counter)(MeasurementUnit.none)
+  val OFFLINE_INVOKERS =
+    LogMarkerToken(loadbalancer, "totalOfflineInvoker", counter)(MeasurementUnit.none)
+
+  val INVOKER_TOTALMEM = LogMarkerToken(loadbalancer, "totalCapacity", counter)(MeasurementUnit.none)
+
   // Kafka related markers
   def KAFKA_QUEUE(topic: String) =
     if (TransactionId.metricsKamonTags)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index 361478213..5705099ab 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -3,12 +3,12 @@ package org.apache.openwhisk.core.loadBalancer
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.atomic.LongAdder
-
 import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Props}
 import akka.event.Logging.InfoLevel
 import akka.pattern.ask
 import akka.util.Timeout
 import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common.LoggingMarkers._
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.controller.Controller
@@ -335,6 +335,16 @@ class FPCPoolBalancer(config: WhiskConfig,
     }
   }
 
+  // Singletons for counter metrics related to completion acks
+  protected val LOADBALANCER_COMPLETION_ACK_REGULAR =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularCompletionAck)
+  protected val LOADBALANCER_COMPLETION_ACK_FORCED =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedCompletionAck)
+  protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularAfterForcedCompletionAck)
+  protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedAfterRegularCompletionAck)
+
   /** Process the completion ack and update the state */
   protected[loadBalancer] def processCompletion(aid: ActivationId,
                                                 tid: TransactionId,
@@ -359,8 +369,10 @@ class FPCPoolBalancer(config: WhiskConfig,
           // the active ack is received as expected, and processing that message removed the promise
           // from the corresponding map
           logging.info(this, s"received completion ack for '$aid', system error=$isSystemError")(tid)
+          MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR)
         } else {
           logging.error(this, s"Failed to invoke action ${aid.toString}, error: timeout waiting for the active ack")
+          MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)
 
           // the entry has timed out; if the active ack is still around, remove its entry also
           // and complete the promise with a failure if necessary
@@ -378,11 +390,13 @@ class FPCPoolBalancer(config: WhiskConfig,
         // Logging this condition as a warning because the invoker processed the activation and sent a completion
         // message - but not in time.
         logging.warn(this, s"received completion ack for '$aid' which has no entry, system error=$isSystemError")(tid)
+        MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
       case None =>
         // The entry has already been removed by a completion ack. This part of the code is reached by the timeout and can
         // happen if completion ack and timeout happen roughly at the same time (the timeout was triggered before the completion
         // ack canceled the timer). As the completion ack is already processed we don't have to do anything here.
         logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid)
+        MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR)
     }
   }
 
@@ -600,6 +614,28 @@ class FPCPoolBalancer(config: WhiskConfig,
     }
   }
 
+  def emitMetrics() = {
+    invokerHealth().map(invokers => {
+      MetricEmitter.emitGaugeMetric(HEALTHY_INVOKERS, invokers.count(_.status == Healthy))
+      MetricEmitter.emitGaugeMetric(UNHEALTHY_INVOKERS, invokers.count(_.status == Unhealthy))
+      MetricEmitter.emitGaugeMetric(OFFLINE_INVOKERS, invokers.count(_.status == Offline))
+      // Add both user memory and busy memory because user memory represents free memory in this case
+      MetricEmitter.emitGaugeMetric(
+        INVOKER_TOTALMEM,
+        invokers.foldLeft(0L) { (total, curr) =>
+          if (curr.status.isUsable) {
+            curr.id.userMemory.toMB + curr.id.busyMemory.getOrElse(ByteSize(0, SizeUnits.BYTE)).toMB + total
+          } else {
+            total
+          }
+        })
+      MetricEmitter.emitGaugeMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
+      MetricEmitter.emitGaugeMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""), totalActivationMemory.longValue)
+    })
+  }
+
+  actorSystem.scheduler.scheduleAtFixedRate(10.seconds, 10.seconds)(() => emitMetrics())
+
   /** Gets the number of in-flight activations for a specific user. */
   override def activeActivationsFor(namespace: UUID): Future[Int] =
     Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))