You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/07/03 07:09:13 UTC

[incubator-openwhisk] branch master updated: Add metrics to monitor the overall memory consumed by usercontainers. (#3831)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 4db501b  Add metrics to monitor the overall memory consumed by usercontainers. (#3831)
4db501b is described below

commit 4db501b5032f7e8251e77c4698e5d103bac9a779
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Tue Jul 3 09:09:10 2018 +0200

    Add metrics to monitor the overall memory consumed by usercontainers. (#3831)
---
 common/scala/src/main/scala/whisk/common/Logging.scala           | 2 ++
 .../whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala  | 9 ++++++++-
 .../loadBalancer/test/ShardingContainerPoolBalancerTests.scala   | 4 +++-
 3 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala
index cf8cb26..17bc2d3 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -278,6 +278,8 @@ object LoggingMarkers {
 
   def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: ControllerInstanceId) =
     LogMarkerToken(loadbalancer + controllerInstance.asString, "activationsInflight", count)
+  def LOADBALANCER_MEMORY_INFLIGHT(controllerInstance: ControllerInstanceId) =
+    LogMarkerToken(loadbalancer + controllerInstance.asString, "memoryInflight", count)
 
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 72124ec..9ddbf0a 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -35,6 +35,7 @@ import whisk.common._
 import whisk.core.WhiskConfig._
 import whisk.core.connector._
 import whisk.core.entity._
+import whisk.core.entity.size._
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.spi.SpiLoader
 
@@ -159,12 +160,14 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
   private val activations = TrieMap[ActivationId, ActivationEntry]()
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
   private val totalActivations = new LongAdder()
+  private val totalActivationMemory = new LongAdder()
 
   /** State needed for scheduling. */
   private val schedulingState = ShardingContainerPoolBalancerState()()
 
   actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
     MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
+    MetricEmitter.emitHistogramMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance), totalActivationMemory.longValue)
   }
 
   /**
@@ -248,6 +251,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
                               instance: InvokerInstanceId): ActivationEntry = {
 
     totalActivations.increment()
+    totalActivationMemory.add(action.limits.memory.megabytes)
     activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment()
 
     val timeout = action.limits.timeout.duration.max(TimeLimit.STD_DURATION) + 1.minute
@@ -266,6 +270,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
           msg.activationId,
           msg.user.namespace.uuid,
           instance,
+          action.limits.memory.megabytes.MB,
           timeoutHandler,
           Promise[Either[ActivationId, WhiskActivation]]())
       })
@@ -347,6 +352,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
     activations.remove(aid) match {
       case Some(entry) =>
         totalActivations.decrement()
+        totalActivationMemory.add(entry.memory.toMB * (-1))
         activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
         schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
 
@@ -427,7 +433,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
                dispatched: IndexedSeq[ForcableSemaphore],
                index: Int,
                step: Int,
-               stepsDone: Int = 0)(implicit logging: Logging): Option[InvokerInstanceId] = {
+               stepsDone: Int = 0)(implicit logging: Logging, transId: TransactionId): Option[InvokerInstanceId] = {
     val numInvokers = invokers.size
 
     if (numInvokers > 0) {
@@ -588,5 +594,6 @@ case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invoker
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
                            invokerName: InvokerInstanceId,
+                           memory: ByteSize,
                            timeoutHandler: Cancellable,
                            promise: Promise[Either[ActivationId, WhiskActivation]])
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 4c8d15f..9b2567e 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -21,7 +21,7 @@ import common.StreamLogging
 import org.junit.runner.RunWith
 import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
-import whisk.common.ForcableSemaphore
+import whisk.common.{ForcableSemaphore, TransactionId}
 import whisk.core.entity.InvokerInstanceId
 import whisk.core.loadBalancer._
 
@@ -153,6 +153,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
 
   behavior of "schedule"
 
+  implicit val transId = TransactionId.testing
+
   it should "return None on an empty invoker list" in {
     ShardingContainerPoolBalancer.schedule(IndexedSeq.empty, IndexedSeq.empty, index = 0, step = 2) shouldBe None
   }