You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/08/13 16:41:56 UTC
[incubator-openwhisk] branch master updated: Make LoadbalancerData
leaner. (#2587)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new c2a9462 Make LoadbalancerData leaner. (#2587)
c2a9462 is described below
commit c2a94627f7f054bffca3b6b99a91cb01171dc965
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Sun Aug 13 18:41:53 2017 +0200
Make LoadbalancerData leaner. (#2587)
LoadbalancerData keeps a lot of data it doesn't need. Upon fetching a copy of this data, it creates copies which can be expensive, the more user are in the system.
---
.../core/entitlement/ActivationThrottler.scala | 30 +--------
.../whisk/core/loadBalancer/LoadBalancerData.scala | 39 +++++++-----
.../core/loadBalancer/LoadBalancerService.scala | 17 ++---
.../controller/test/ControllerTestCommon.scala | 3 +-
.../loadBalancer/test/LoadBalancerDataTests.scala | 72 +++++++++-------------
5 files changed, 67 insertions(+), 94 deletions(-)
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index aa44118..b9567c5 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -17,15 +17,9 @@
package whisk.core.entitlement
-import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
-
-import akka.actor.ActorSystem
import whisk.common.Logging
-import whisk.common.Scheduler
import whisk.common.TransactionId
import whisk.core.entity.Identity
-import whisk.core.entity.UUID
import whisk.core.loadBalancer.LoadBalancer
/**
@@ -38,28 +32,15 @@ import whisk.core.loadBalancer.LoadBalancer
* @param defaultConcurrencyLimit the default max allowed concurrent operations
* @param systemOverloadLimit the limit when the system is considered overloaded
*/
-class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)(
- implicit val system: ActorSystem, logging: Logging) {
+class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)(implicit logging: Logging) {
logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit")
- implicit private val executionContext = system.dispatcher
-
- /**
- * holds the values of the last run of the scheduler below to be gettable by outside
- * services to be able to determine whether a namespace should be throttled or not based on
- * the number of concurrent invocations it has in the system
- */
- @volatile
- private var namespaceActivationCounter = Map.empty[UUID, Int]
-
- private val healthCheckInterval = 5.seconds
-
/**
* Checks whether the operation should be allowed to proceed.
*/
def check(user: Identity)(implicit tid: TransactionId): Boolean = {
- val concurrentActivations = namespaceActivationCounter.getOrElse(user.uuid, 0)
+ val concurrentActivations = loadBalancer.activeActivationsFor(user.uuid)
val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
logging.info(this, s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
concurrentActivations < concurrencyLimit
@@ -69,13 +50,8 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
* Checks whether the system is in a generally overloaded state.
*/
def isOverloaded()(implicit tid: TransactionId): Boolean = {
- val concurrentActivations = namespaceActivationCounter.values.sum
+ val concurrentActivations = loadBalancer.totalActiveActivations
logging.info(this, s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
concurrentActivations > systemOverloadLimit
}
-
- Scheduler.scheduleWaitAtLeast(healthCheckInterval) { () =>
- namespaceActivationCounter = loadBalancer.getActiveNamespaceActivationCounts
- Future.successful(Unit)
- }
}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index a33b813..9014cc6 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -17,9 +17,12 @@
package whisk.core.loadBalancer
-import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
+import java.util.concurrent.atomic.AtomicInteger
+
import scala.collection.concurrent.TrieMap
import scala.concurrent.Promise
+
+import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
import whisk.core.entity.InstanceId
/** Encapsulates data relevant for a single activation */
@@ -33,28 +36,32 @@ case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: Ins
*/
class LoadBalancerData() {
- type TrieSet[T] = TrieMap[T, Unit]
+ private val activationByInvoker = TrieMap[InstanceId, AtomicInteger]()
+ private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
+ private val activationsById = TrieMap[ActivationId, ActivationEntry]()
+ private val totalActivations = new AtomicInteger(0)
- private val activationByInvoker = new TrieMap[InstanceId, TrieSet[ActivationEntry]]
- private val activationByNamespaceId = new TrieMap[UUID, TrieSet[ActivationEntry]]
- private val activationsById = new TrieMap[ActivationId, ActivationEntry]
+ /** Get the number of activations across all namespaces. */
+ def totalActivationCount = totalActivations.get
/**
- * Get the number of activations for each namespace.
+ * Get the number of activations for a specific namespace.
*
+ * @param namespace The namespace to get the activation count for
* @return a map (namespace -> number of activations in the system)
*/
- def activationCountByNamespace: Map[UUID, Int] = {
- activationByNamespaceId.toMap.mapValues(_.size)
+ def activationCountOn(namespace: UUID) = {
+ activationByNamespaceId.get(namespace).map(_.get).getOrElse(0)
}
/**
- * Get the number of activations for each invoker.
+ * Get the number of activations for a specific invoker.
*
+ * @param invoker The invoker to get the activation count for
* @return a map (invoker -> number of activations queued for the invoker)
*/
- def activationCountByInvoker: Map[InstanceId, Int] = {
- activationByInvoker.toMap.mapValues(_.size)
+ def activationCountOn(invoker: InstanceId): Int = {
+ activationByInvoker.get(invoker).map(_.get).getOrElse(0)
}
/**
@@ -80,8 +87,9 @@ class LoadBalancerData() {
def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
activationsById.getOrElseUpdate(id, {
val entry = update
- activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new TrieSet[ActivationEntry]).put(entry, {})
- activationByInvoker.getOrElseUpdate(entry.invokerName, new TrieSet[ActivationEntry]).put(entry, {})
+ totalActivations.incrementAndGet()
+ activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).incrementAndGet()
+ activationByInvoker.getOrElseUpdate(entry.invokerName, new AtomicInteger(0)).incrementAndGet()
entry
})
}
@@ -94,8 +102,9 @@ class LoadBalancerData() {
*/
def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
activationsById.remove(entry.id).map { x =>
- activationByNamespaceId.getOrElseUpdate(x.namespaceId, new TrieSet[ActivationEntry]).remove(entry)
- activationByInvoker.getOrElseUpdate(x.invokerName, new TrieSet[ActivationEntry]).remove(entry)
+ totalActivations.decrementAndGet()
+ activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).decrementAndGet()
+ activationByInvoker.getOrElseUpdate(entry.invokerName, new AtomicInteger(0)).decrementAndGet()
x
}
}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index b50eebc..b4c79ee 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -56,12 +56,11 @@ trait LoadBalancer {
val activeAckTimeoutGrace = 1.minute
- /**
- * Retrieves a per namespace id map of counts representing in-flight activations as seen by the load balancer
- *
- * @return a map where the key is the namespace id and the long is total issued activations by that namespace
- */
- def getActiveNamespaceActivationCounts: Map[UUID, Int]
+ /** Gets the number of in-flight activations for a specific user. */
+ def activeActivationsFor(namspace: UUID): Int
+
+ /** Gets the number of in-flight activations in the system. */
+ def totalActiveActivations: Int
/**
* Publishes activation message on internal bus for an invoker to pick up.
@@ -96,7 +95,9 @@ class LoadBalancerService(
private val loadBalancerData = new LoadBalancerData()
- override def getActiveNamespaceActivationCounts: Map[UUID, Int] = loadBalancerData.activationCountByNamespace
+ override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
+
+ override def totalActiveActivations = loadBalancerData.totalActivationCount
override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
@@ -259,7 +260,7 @@ class LoadBalancerService(
val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
val invokersWithUsage = invokersToUse.view.map {
// Using a view defers the comparably expensive lookup to actual access of the element
- case (instance, state) => (instance, state, loadBalancerData.activationCountByInvoker.get(instance).getOrElse(0))
+ case (instance, state) => (instance, state, loadBalancerData.activationCountOn(instance))
}
LoadBalancerService.schedule(invokersWithUsage, config.loadbalancerInvokerBusyThreshold, hash) match {
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index d652272..dbfa17f 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -180,7 +180,8 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
// unit tests that need an activation via active ack/fast path should set this to value expected
var whiskActivationStub: Option[(FiniteDuration, WhiskActivation)] = None
- override def getActiveNamespaceActivationCounts: Map[UUID, Int] = Map.empty
+ override def totalActiveActivations = 0
+ override def activeActivationsFor(namespace: UUID) = 0
override def publish(action: ExecutableWhiskAction, msg: ActivationMessage)(implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] =
Future.successful {
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index f2619bb..1daf1ee 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -37,9 +37,8 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
val loadBalancerData = new LoadBalancerData()
loadBalancerData.putActivation(firstEntry.id, firstEntry)
- val result = loadBalancerData.activationCountByNamespace
- result shouldBe Map(firstEntry.namespaceId -> 1)
- loadBalancerData.activationCountByInvoker(firstEntry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
loadBalancerData.activationById(firstEntry.id) shouldBe Some(firstEntry)
}
@@ -49,48 +48,35 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
loadBalancerData.putActivation(firstEntry.id, firstEntry)
loadBalancerData.putActivation(secondEntry.id, secondEntry)
- val result = loadBalancerData.activationCountByInvoker
-
- result shouldBe Map(firstEntry.invokerName -> 1, secondEntry.invokerName -> 1)
-
- loadBalancerData.activationCountByInvoker(firstEntry.invokerName) shouldBe 1
- loadBalancerData.activationCountByInvoker(secondEntry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(secondEntry.invokerName) shouldBe 1
loadBalancerData.activationById(firstEntry.id) shouldBe Some(firstEntry)
loadBalancerData.activationById(secondEntry.id) shouldBe Some(secondEntry)
}
- it should "remove activations from all 3 maps" in {
+ it should "remove activations and reflect that accordingly" in {
val loadBalancerData = new LoadBalancerData()
loadBalancerData.putActivation(firstEntry.id, firstEntry)
- loadBalancerData.putActivation(secondEntry.id, secondEntry)
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
loadBalancerData.removeActivation(firstEntry)
- loadBalancerData.removeActivation(secondEntry)
-
- val activationsByInvoker = loadBalancerData.activationCountByInvoker
- val activationsByNamespace = loadBalancerData.activationCountByNamespace
- activationsByInvoker.values.sum shouldBe 0
- activationsByNamespace.values.sum shouldBe 0
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 0
+ loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 0
loadBalancerData.activationById(firstEntry.id) shouldBe None
- loadBalancerData.activationById(secondEntry.id) shouldBe None
}
it should "remove activations from all 3 maps by activation id" in {
val loadBalancerData = new LoadBalancerData()
loadBalancerData.putActivation(firstEntry.id, firstEntry)
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
loadBalancerData.removeActivation(firstEntry.id)
- val activationsByInvoker = loadBalancerData.activationCountByInvoker
- val activationsByNamespace = loadBalancerData.activationCountByNamespace
-
- activationsByInvoker.values.sum shouldBe 0
- activationsByNamespace.values.sum shouldBe 0
- loadBalancerData.activationById(firstEntry.id) shouldBe None
-
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 0
}
it should "return None if the entry doesn't exist when we remove it" in {
@@ -106,25 +92,25 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
val loadBalancerData = new LoadBalancerData()
loadBalancerData.putActivation(entry.id, entry)
- loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 1
- loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
+ loadBalancerData.activationCountOn(entry.invokerName) shouldBe 1
loadBalancerData.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace)
- loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 2
- loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 2
+ loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 2
+ loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
loadBalancerData.putActivation(entrySameInvoker.id, entrySameInvoker)
- loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 2
- loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 3
+ loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 2
+ loadBalancerData.activationCountOn(entry.invokerName) shouldBe 3
loadBalancerData.removeActivation(entrySameInvokerAndNamespace)
- loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 1
- loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 2
+ loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
+ loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
// removing non existing entry doesn't mess up
loadBalancerData.removeActivation(entrySameInvokerAndNamespace)
- loadBalancerData.activationCountByNamespace(entry.namespaceId) shouldBe 1
- loadBalancerData.activationCountByInvoker(entry.invokerName) shouldBe 2
+ loadBalancerData.activationCountOn(entry.namespaceId) shouldBe 1
+ loadBalancerData.activationCountOn(entry.invokerName) shouldBe 2
}
@@ -133,12 +119,12 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
val loadBalancerData = new LoadBalancerData()
loadBalancerData.putActivation(firstEntry.id, firstEntry)
- loadBalancerData.activationCountByInvoker shouldBe Map(firstEntry.invokerName -> 1)
- loadBalancerData.activationCountByNamespace shouldBe Map(firstEntry.namespaceId -> 1)
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
loadBalancerData.putActivation(firstEntry.id, firstEntry)
- loadBalancerData.activationCountByInvoker shouldBe Map(firstEntry.invokerName -> 1)
- loadBalancerData.activationCountByNamespace shouldBe Map(firstEntry.namespaceId -> 1)
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
}
it should "not evaluate the given block if an entry already exists" in {
@@ -175,8 +161,8 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
})
called shouldBe 1
- loadBalancerData.activationCountByInvoker shouldBe Map(firstEntry.invokerName -> 1)
- loadBalancerData.activationCountByNamespace shouldBe Map(firstEntry.namespaceId -> 1)
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
// entry already exists, should not evaluate the block and change the state
val entryAfterSecond = loadBalancerData.putActivation(entrySameId.id, {
@@ -186,8 +172,8 @@ class LoadBalancerDataTests extends FlatSpec with Matchers {
called shouldBe 1
entry shouldBe entryAfterSecond
- loadBalancerData.activationCountByInvoker shouldBe Map(firstEntry.invokerName -> 1)
- loadBalancerData.activationCountByNamespace shouldBe Map(firstEntry.namespaceId -> 1)
+ loadBalancerData.activationCountOn(firstEntry.invokerName) shouldBe 1
+ loadBalancerData.activationCountOn(firstEntry.namespaceId) shouldBe 1
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].