You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/08/13 16:41:56 UTC

[incubator-openwhisk] branch master updated: Make LoadbalancerData leaner. (#2587)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new c2a9462  Make LoadbalancerData leaner. (#2587)
c2a9462 is described below

commit c2a94627f7f054bffca3b6b99a91cb01171dc965
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Sun Aug 13 18:41:53 2017 +0200

    Make LoadbalancerData leaner. (#2587)
    
    LoadbalancerData keeps a lot of data it doesn't need. Upon fetching a copy of this data, it creates copies which can be expensive, the more user are in the system.
---
 .../core/entitlement/ActivationThrottler.scala     | 30 +--------
 .../whisk/core/loadBalancer/LoadBalancerData.scala | 39 +++++++-----
 .../core/loadBalancer/LoadBalancerService.scala    | 17 ++---
 .../controller/test/ControllerTestCommon.scala     |  3 +-
 .../loadBalancer/test/LoadBalancerDataTests.scala  | 72 +++++++++-------------
 5 files changed, 67 insertions(+), 94 deletions(-)

diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index aa44118..b9567c5 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -17,15 +17,9 @@
 
 package whisk.core.entitlement
 
-import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-
-import akka.actor.ActorSystem
 import whisk.common.Logging
-import whisk.common.Scheduler
 import whisk.common.TransactionId
 import whisk.core.entity.Identity
-import whisk.core.entity.UUID
 import whisk.core.loadBalancer.LoadBalancer
 
 /**
@@ -38,28 +32,15 @@ import whisk.core.loadBalancer.LoadBalancer
  * @param defaultConcurrencyLimit the default max allowed concurrent operations
  * @param systemOverloadLimit the limit when the system is considered overloaded
  */
-class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)(
-    implicit val system: ActorSystem, logging: Logging) {
+class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)(implicit logging: Logging) {
 
     logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit")
 
-    implicit private val executionContext = system.dispatcher
-
-    /**
-     * holds the values of the last run of the scheduler below to be gettable by outside
-     * services to be able to determine whether a namespace should be throttled or not based on
-     * the number of concurrent invocations it has in the system
-     */
-    @volatile
-    private var namespaceActivationCounter = Map.empty[UUID, Int]
-
-    private val healthCheckInterval = 5.seconds
-
     /**
      * Checks whether the operation should be allowed to proceed.
      */
     def check(user: Identity)(implicit tid: TransactionId): Boolean = {
-        val concurrentActivations = namespaceActivationCounter.getOrElse(user.uuid, 0)
+        val concurrentActivations = loadBalancer.activeActivationsFor(user.uuid)
         val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
         logging.info(this, s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
         concurrentActivations < concurrencyLimit
@@ -69,13 +50,8 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
      * Checks whether the system is in a generally overloaded state.
      */
     def isOverloaded()(implicit tid: TransactionId): Boolean = {
-        val concurrentActivations = namespaceActivationCounter.values.sum
+        val concurrentActivations = loadBalancer.totalActiveActivations
         logging.info(this, s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
         concurrentActivations > systemOverloadLimit
     }
-
-    Scheduler.scheduleWaitAtLeast(healthCheckInterval) { () =>
-        namespaceActivationCounter = loadBalancer.getActiveNamespaceActivationCounts
-        Future.successful(Unit)
-    }
 }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index a33b813..9014cc6 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -17,9 +17,12 @@
 
 package whisk.core.loadBalancer
 
-import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
+import java.util.concurrent.atomic.AtomicInteger
+
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.Promise
+
+import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
 import whisk.core.entity.InstanceId
 
 /** Encapsulates data relevant for a single activation */
@@ -33,28 +36,32 @@ case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: Ins
  */
 class LoadBalancerData() {
 
-    type TrieSet[T] = TrieMap[T, Unit]
+    private val activationByInvoker = TrieMap[InstanceId, AtomicInteger]()
+    private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
+    private val activationsById = TrieMap[ActivationId, ActivationEntry]()
+    private val totalActivations = new AtomicInteger(0)
 
-    private val activationByInvoker = new TrieMap[InstanceId, TrieSet[ActivationEntry]]
-    private val activationByNamespaceId = new TrieMap[UUID, TrieSet[ActivationEntry]]
-    private val activationsById = new TrieMap[ActivationId, ActivationEntry]
+    /** Get the number of activations across all namespaces. */
+    def totalActivationCount = totalActivations.get
 
     /**
-     * Get the number of activations for each namespace.
+     * Get the number of activations for a specific namespace.
      *
+     * @param namespace The namespace to get the activation count for
      * @return a map (namespace -> number of activations in the system)
      */
-    def activationCountByNamespace: Map[UUID, Int] = {
-        activationByNamespaceId.toMap.mapValues(_.size)
+    def activationCountOn(namespace: UUID) = {
+        activationByNamespaceId.get(namespace).map(_.get).getOrElse(0)
     }
 
     /**
-     * Get the number of activations for each invoker.
+     * Get the number of activations for a specific invoker.
      *
+     * @param invoker The invoker to get the activation count for
      * @return a map (invoker -> number of activations queued for the invoker)
      */
-    def activationCountByInvoker: Map[InstanceId, Int] = {
-        activationByInvoker.toMap.mapValues(_.size)
+    def activationCountOn(invoker: InstanceId): Int = {
+        activationByInvoker.get(invoker).map(_.get).getOrElse(0)
     }
 
     /**
@@ -80,8 +87,9 @@ class LoadBalancerData() {
     def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
         activationsById.getOrElseUpdate(id, {
             val entry = update
-            activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new TrieSet[ActivationEntry]).put(entry, {})
-            activationByInvoker.getOrElseUpdate(entry.invokerName, new TrieSet[ActivationEntry]).put(entry, {})
+            totalActivations.incrementAndGet()
+            activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).incrementAndGet()
+            activationByInvoker.getOrElseUpdate(entry.invokerName, new AtomicInteger(0)).incrementAndGet()
             entry
         })
     }
@@ -94,8 +102,9 @@ class LoadBalancerData() {
      */
     def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
         activationsById.remove(entry.id).map { x =>
-            activationByNamespaceId.getOrElseUpdate(x.namespaceId, new TrieSet[ActivationEntry]).remove(entry)
-            activationByInvoker.getOrElseUpdate(x.invokerName, new TrieSet[ActivationEntry]).remove(entry)
+            totalActivations.decrementAndGet()
+            activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).decrementAndGet()
+            activationByInvoker.getOrElseUpdate(entry.invokerName, new AtomicInteger(0)).decrementAndGet()
             x
         }
     }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index b50eebc..b4c79ee 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -56,12 +56,11 @@ trait LoadBalancer {
 
     val activeAckTimeoutGrace = 1.minute
 
-    /**
-     * Retrieves a per namespace id map of counts representing in-flight activations as seen by the load balancer
-     *
-     * @return a map where the key is the namespace id and the long is total issued activations by that namespace
-     */
-    def getActiveNamespaceActivationCounts: Map[UUID, Int]
+    /** Gets the number of in-flight activations for a specific user. */
+    def activeActivationsFor(namspace: UUID): Int
+
+    /** Gets the number of in-flight activations in the system. */
+    def totalActiveActivations: Int
 
     /**
      * Publishes activation message on internal bus for an invoker to pick up.
@@ -96,7 +95,9 @@ class LoadBalancerService(
 
     private val loadBalancerData = new LoadBalancerData()
 
-    override def getActiveNamespaceActivationCounts: Map[UUID, Int] = loadBalancerData.activationCountByNamespace
+    override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
+
+    override def totalActiveActivations = loadBalancerData.totalActivationCount
 
     override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(
         implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
@@ -259,7 +260,7 @@ class LoadBalancerService(
             val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
             val invokersWithUsage = invokersToUse.view.map {
                 // Using a view defers the comparably expensive lookup to actual access of the element
-                case (instance, state) => (instance, state, loadBalancerData.activationCountByInvoker.get(instance).getOrElse(0))
+                case (instance, state) => (instance, state, loadBalancerData.activationCountOn(instance))
             }
 
             LoadBalancerService.schedule(invokersWithUsage, config.loadbalancerInvokerBusyThreshold, hash) match {
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index d652272..dbfa17f 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -180,7 +180,8 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
     // unit tests that need an activation via active ack/fast path should set this to value expected
     var whiskActivationStub: Option[(FiniteDuration, WhiskActivation)] = None
 
-    override def getActiveNamespaceActivationCounts: Map[UUID, Int] = Map.empty
+    override def totalActiveActivations = 0
+    override def activeActivationsFor(namespace: UUID) = 0
 
     override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] =
         Future.successful {
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index f2619bb..1daf1ee 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -37,9 +37,8 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
         val loadBalancerData = new LoadBalancerData()
         loadBalancerData.putActivation(firstEntry.id, firstEntry)
 
-        val result = loadBalancerData.activationCountByNamespace
-        result shouldBe Map(firstEntry.namespaceId -> 1)
-        loadBalancerData.activationCountByInvoker(firstEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
         loadBalancerData.activationById(firstEntry.id) shouldBe Some(firstEntry)
     }
 
@@ -49,48 +48,35 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
         loadBalancerData.putActivation(firstEntry.id, firstEntry)
         loadBalancerData.putActivation(secondEntry.id, secondEntry)
 
-        val result = loadBalancerData.activationCountByInvoker
-
-        result shouldBe Map(firstEntry.invokerName -> 1, secondEntry.invokerName -> 1)
-
-        loadBalancerData.activationCountByInvoker(firstEntry.invokerName) shouldBe 1
-        loadBalancerData.activationCountByInvoker(secondEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(secondEntry.invokerName) shouldBe 1
         loadBalancerData.activationById(firstEntry.id) shouldBe Some(firstEntry)
         loadBalancerData.activationById(secondEntry.id) shouldBe Some(secondEntry)
     }
 
-    it should "remove activations from all 3 maps" in {
+    it should "remove activations and reflect that accordingly" in {
 
         val loadBalancerData = new LoadBalancerData()
         loadBalancerData.putActivation(firstEntry.id, firstEntry)
-        loadBalancerData.putActivation(secondEntry.id, secondEntry)
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
         loadBalancerData.removeActivation(firstEntry)
-        loadBalancerData.removeActivation(secondEntry)
-
-        val activationsByInvoker = loadBalancerData.activationCountByInvoker
-        val activationsByNamespace = loadBalancerData.activationCountByNamespace
 
-        activationsByInvoker.values.sum shouldBe 0
-        activationsByNamespace.values.sum shouldBe 0
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 0
+        loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 0
         loadBalancerData.activationById(firstEntry.id) shouldBe None
-        loadBalancerData.activationById(secondEntry.id) shouldBe None
     }
 
     it should "remove activations from all 3 maps by activation id" in {
 
         val loadBalancerData = new LoadBalancerData()
         loadBalancerData.putActivation(firstEntry.id, firstEntry)
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
 
         loadBalancerData.removeActivation(firstEntry.id)
 
-        val activationsByInvoker = loadBalancerData.activationCountByInvoker
-        val activationsByNamespace = loadBalancerData.activationCountByNamespace
-
-        activationsByInvoker.values.sum shouldBe 0
-        activationsByNamespace.values.sum shouldBe 0
-        loadBalancerData.activationById(firstEntry.id) shouldBe None
-
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 0
     }
 
     it should "return None if the entry doesn't exist when we remove it" in {
@@ -106,25 +92,25 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
 
         val loadBalancerData = new LoadBalancerData()
         loadBalancerData.putActivation(entry.id, entry)
-        loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 1
-        loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
+        loadBalancerData.activationCountOn(entry.invokerName) shouldBe 1
 
         loadBalancerData.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace)
-        loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 2
-        loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 2
+        loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 2
+        loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
 
         loadBalancerData.putActivation(entrySameInvoker.id, entrySameInvoker)
-        loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 2
-        loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 3
+        loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 2
+        loadBalancerData.activationCountOn(entry.invokerName) shouldBe 3
 
         loadBalancerData.removeActivation(entrySameInvokerAndNamespace)
-        loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 1
-        loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 2
+        loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
+        loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
 
         // removing non existing entry doesn't mess up
         loadBalancerData.removeActivation(entrySameInvokerAndNamespace)
-        loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 1
-        loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 2
+        loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
+        loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
 
     }
 
@@ -133,12 +119,12 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
         val loadBalancerData = new LoadBalancerData()
 
         loadBalancerData.putActivation(firstEntry.id, firstEntry)
-        loadBalancerData.activationCountByInvoker shouldBe Map(firstEntry.invokerName -> 1)
-        loadBalancerData.activationCountByNamespace shouldBe Map(firstEntry.namespaceId -> 1)
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
         loadBalancerData.putActivation(firstEntry.id, firstEntry)
-        loadBalancerData.activationCountByInvoker shouldBe Map(firstEntry.invokerName -> 1)
-        loadBalancerData.activationCountByNamespace shouldBe Map(firstEntry.namespaceId -> 1)
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
     }
 
     it should "not evaluate the given block if an entry already exists" in {
@@ -175,8 +161,8 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
         })
 
         called shouldBe 1
-        loadBalancerData.activationCountByInvoker shouldBe Map(firstEntry.invokerName -> 1)
-        loadBalancerData.activationCountByNamespace shouldBe Map(firstEntry.namespaceId -> 1)
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
         // entry already exists, should not evaluate the block and change the state
         val entryAfterSecond = loadBalancerData.putActivation(entrySameId.id, {
@@ -186,8 +172,8 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
 
         called shouldBe 1
         entry shouldBe entryAfterSecond
-        loadBalancerData.activationCountByInvoker shouldBe Map(firstEntry.invokerName -> 1)
-        loadBalancerData.activationCountByNamespace shouldBe Map(firstEntry.namespaceId -> 1)
+        loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+        loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
     }
 
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].