You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2018/01/23 00:48:13 UTC
[incubator-openwhisk] branch master updated: Dynamic LoadBalancer
load using SpiLoader (#2984)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 113a915 Dynamic LoadBalancer load using SpiLoader (#2984)
113a915 is described below
commit 113a91500839dd90dfe4ddc74776bd95cad69ec6
Author: kpavel <kp...@il.ibm.com>
AuthorDate: Tue Jan 23 02:48:11 2018 +0200
Dynamic LoadBalancer load using SpiLoader (#2984)
* Add a container pool/invoker loadbalancer SPI.
* Refactor invoker health tuple to proper type for cleaner SPI interface.
Rename LoadBalancerService to ContainerPoolBalancer.
Split into two files.
---
common/scala/src/main/resources/reference.conf | 1 +
.../scala/whisk/core/controller/Controller.scala | 16 +--
.../scala/whisk/core/controller/RestAPIs.scala | 14 +--
...erService.scala => ContainerPoolBalancer.scala} | 124 ++++++++-------------
.../core/loadBalancer/InvokerSupervision.scala | 10 +-
.../whisk/core/loadBalancer/LoadBalancer.scala | 83 ++++++++++++++
.../controller/test/ControllerTestCommon.scala | 1 +
...cala => ContainerPoolBalancerObjectTests.scala} | 54 ++++-----
.../test/InvokerSupervisionTests.scala | 3 +-
9 files changed, 180 insertions(+), 126 deletions(-)
diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 45543e5..4530aef 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -3,4 +3,5 @@ whisk.spi{
MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider
LogStoreProvider = whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
+ LoadBalancerProvider = whisk.core.loadBalancer.ContainerPoolBalancer
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index ba58452..53408c3 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -46,7 +46,7 @@ import whisk.core.entitlement._
import whisk.core.entity._
import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity.ExecManifest.Runtimes
-import whisk.core.loadBalancer.{LoadBalancerService}
+import whisk.core.loadBalancer.LoadBalancerProvider
import whisk.http.BasicHttpService
import whisk.http.BasicRasService
import whisk.spi.SpiLoader
@@ -117,7 +117,8 @@ class Controller(val instance: InstanceId,
})
// initialize backend services
- private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore)
+ private implicit val loadBalancer =
+ SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer)
private implicit val activationIdFactory = new ActivationIdGenerator {}
private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem)
@@ -137,12 +138,13 @@ class Controller(val instance: InstanceId,
*/
private val internalInvokerHealth = {
implicit val executionContext = actorSystem.dispatcher
-
(path("invokers") & get) {
complete {
- loadBalancer.allInvokers.map(_.map {
- case (instance, state) => s"invoker${instance.toInt}" -> state.asString
- }.toMap.toJson.asJsObject)
+ loadBalancer
+ .invokerHealth()
+ .map(_.map {
+ case i => s"invoker${i.id.toInt}" -> i.status.asString
+ }.toMap.toJson.asJsObject)
}
}
}
@@ -163,7 +165,7 @@ object Controller {
Map(WhiskConfig.controllerInstances -> null) ++
ExecManifest.requiredProperties ++
RestApiCommons.requiredProperties ++
- LoadBalancerService.requiredProperties ++
+ SpiLoader.get[LoadBalancerProvider].requiredProperties ++
EntitlementProvider.requiredProperties
private def info(config: WhiskConfig, runtimes: Runtimes, apis: List[String]) =
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index ffcf01b..161ef4b 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -46,7 +46,7 @@ import whisk.core.entity._
import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity.WhiskAuthStore
import whisk.core.entity.types._
-import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.LoadBalancer
import whisk.http.Messages
/**
@@ -161,7 +161,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
implicit val entityStore: EntityStore,
implicit val entitlementProvider: EntitlementProvider,
implicit val activationIdFactory: ActivationIdGenerator,
- implicit val loadBalancer: LoadBalancerService,
+ implicit val loadBalancer: LoadBalancer,
implicit val cacheChangeNotification: Some[CacheChangeNotification],
implicit val activationStore: ActivationStore,
implicit val logStore: LogStore,
@@ -243,7 +243,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val activationStore: ActivationStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
@@ -266,7 +266,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
implicit override val entityStore: EntityStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
@@ -279,7 +279,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val entityStore: EntityStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
@@ -293,7 +293,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val entitlementProvider: EntitlementProvider,
override val activationStore: ActivationStore,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
@@ -310,7 +310,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val activationStore: ActivationStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val actorSystem: ActorSystem,
override val executionContext: ExecutionContext,
override val logging: Logging,
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
similarity index 82%
rename from core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
rename to core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index 89ba900..ffd831d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -19,82 +19,43 @@ package whisk.core.loadBalancer
import java.nio.charset.StandardCharsets
-import scala.annotation.tailrec
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-import org.apache.kafka.clients.producer.RecordMetadata
-import akka.actor.ActorRefFactory
-import akka.actor.ActorSystem
-import akka.actor.Props
+import akka.actor.{ActorRefFactory, ActorSystem, Props}
import akka.cluster.Cluster
-import akka.util.Timeout
import akka.pattern.ask
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
-import whisk.core.ConfigKeys
-import whisk.core.WhiskConfig
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import org.apache.kafka.clients.producer.RecordMetadata
+import pureconfig._
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
import whisk.core.WhiskConfig._
-import whisk.core.connector.{ActivationMessage, CompletionMessage}
-import whisk.core.connector.MessageFeed
-import whisk.core.connector.MessageProducer
-import whisk.core.connector.MessagingProvider
+import whisk.core.connector._
import whisk.core.database.NoDocumentException
import whisk.core.entity._
-import whisk.core.entity.{ActivationId, WhiskActivation}
-import whisk.core.entity.EntityName
-import whisk.core.entity.ExecutableWhiskActionMetaData
-import whisk.core.entity.Identity
-import whisk.core.entity.InstanceId
-import whisk.core.entity.UUID
-import whisk.core.entity.WhiskAction
import whisk.core.entity.types.EntityStore
+import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.spi.SpiLoader
-import pureconfig._
-case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
-
-trait LoadBalancer {
-
- val activeAckTimeoutGrace = 1.minute
-
- /** Gets the number of in-flight activations for a specific user. */
- def activeActivationsFor(namespace: UUID): Future[Int]
-
- /** Gets the number of in-flight activations in the system. */
- def totalActiveActivations: Future[Int]
-
- /**
- * Publishes activation message on internal bus for an invoker to pick up.
- *
- * @param action the action to invoke
- * @param msg the activation message to publish on an invoker topic
- * @param transid the transaction id for the request
- * @return result a nested Future the outer indicating completion of publishing and
- * the inner the completion of the action (i.e., the result)
- * if it is ready before timeout (Right) otherwise the activation id (Left).
- * The future is guaranteed to complete within the declared action time limit
- * plus a grace period (see activeAckTimeoutGrace).
- */
- def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
- implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
-}
+case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
-class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)(
- implicit val actorSystem: ActorSystem,
- logging: Logging)
+class ContainerPoolBalancer(config: WhiskConfig, instance: InstanceId)(implicit val actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer)
extends LoadBalancer {
private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
+ /** Used to manage an action for testing invoker health */ /** Used to manage an action for testing invoker health */
+ private val entityStore = WhiskEntityStore.datastore(config)
+
/** The execution context for futures */
- implicit val executionContext: ExecutionContext = actorSystem.dispatcher
+ private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
+
+ private val activeAckTimeoutGrace = 1.minute
/** How many invokers are dedicated to blackbox images. We range bound to something sensical regardless of configuration. */
private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
@@ -113,10 +74,6 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
}
}
- override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
-
- override def totalActiveActivations = loadBalancerData.totalActivationCount
-
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
chooseInvoker(msg.user, action).flatMap { invokerName =>
@@ -127,11 +84,16 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
}
}
- /** An indexed sequence of all invokers in the current system */
- def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] =
+ /** An indexed sequence of all invokers in the current system. */
+ override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = {
invokerPool
.ask(GetStatus)(Timeout(5.seconds))
- .mapTo[IndexedSeq[(InstanceId, InvokerState)]]
+ .mapTo[IndexedSeq[InvokerHealth]]
+ }
+
+ override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
+
+ override def totalActiveActivations = loadBalancerData.totalActivationCount
/**
* Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives.
@@ -307,9 +269,8 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
/** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */
private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt)
- /** Return invokers (almost) dedicated to running blackbox actions. */
- private def blackboxInvokers(
- invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId, InvokerState)] = {
+ /** Return invokers dedicated to running blackbox actions. */
+ private def blackboxInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
val blackboxes = numBlackbox(invokers.size)
invokers.takeRight(blackboxes)
}
@@ -318,8 +279,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
* Return (at least one) invokers for running non black-box actions.
* This set can overlap with the blackbox set if there is only one invoker.
*/
- private def managedInvokers(
- invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId, InvokerState)] = {
+ private def managedInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
val managed = Math.max(1, invokers.length - numBlackbox(invokers.length))
invokers.take(managed)
}
@@ -329,14 +289,14 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
val hash = generateHash(user.namespace, action)
loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
- allInvokers.flatMap { invokers =>
+ invokerHealth().flatMap { invokers =>
val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
val invokersWithUsage = invokersToUse.view.map {
// Using a view defers the comparably expensive lookup to actual access of the element
- case (instance, state) => (instance, state, currentActivations.getOrElse(instance.toString, 0))
+ case invoker => (invoker.id, invoker.status, currentActivations.getOrElse(instance.toString, 0))
}
- LoadBalancerService.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
+ ContainerPoolBalancer.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
case Some(invoker) => Future.successful(invoker)
case None =>
logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
@@ -352,7 +312,13 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
}
}
-object LoadBalancerService {
+object ContainerPoolBalancer extends LoadBalancerProvider {
+
+ override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
+ implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): LoadBalancer = new ContainerPoolBalancer(whiskConfig, instance)
+
def requiredProperties =
kafkaHosts ++
Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
@@ -367,7 +333,7 @@ object LoadBalancerService {
def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
/** Returns pairwise coprime numbers until x. Result is memoized. */
- val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = LoadBalancerService.memoize {
+ val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = ContainerPoolBalancer.memoize {
case x =>
(1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
@@ -394,7 +360,7 @@ object LoadBalancerService {
val numInvokers = invokers.size
if (numInvokers > 0) {
val homeInvoker = hash % numInvokers
- val stepSizes = LoadBalancerService.pairwiseCoprimeNumbersUntil(numInvokers)
+ val stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(numInvokers)
val step = stepSizes(hash % stepSizes.size)
val invokerProgression = Stream
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index cd80a8e..13c3a70 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -86,7 +86,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
// from leaking the state for external mutation
var instanceToRef = immutable.Map.empty[InstanceId, ActorRef]
var refToInstance = immutable.Map.empty[ActorRef, InstanceId]
- var status = IndexedSeq[(InstanceId, InvokerState)]()
+ var status = IndexedSeq[InvokerHealth]()
def receive = {
case p: PingMessage =>
@@ -103,13 +103,13 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
case CurrentState(invoker, currentState: InvokerState) =>
refToInstance.get(invoker).foreach { instance =>
- status = status.updated(instance.toInt, (instance, currentState))
+ status = status.updated(instance.toInt, new InvokerHealth(instance, currentState))
}
logStatus()
case Transition(invoker, oldState: InvokerState, newState: InvokerState) =>
refToInstance.get(invoker).foreach { instance =>
- status = status.updated(instance.toInt, (instance, newState))
+ status = status.updated(instance.toInt, new InvokerHealth(instance, newState))
}
logStatus()
@@ -118,7 +118,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
}
def logStatus() = {
- val pretty = status.map { case (instance, state) => s"${instance.toInt} -> $state" }
+ val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}")
logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
}
@@ -155,7 +155,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
def registerInvoker(instanceId: InstanceId): ActorRef = {
logging.info(this, s"registered a new invoker: invoker${instanceId.toInt}")(TransactionId.invokerHealth)
- status = padToIndexed(status, instanceId.toInt + 1, i => (InstanceId(i), Offline))
+ status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InstanceId(i), Offline))
val ref = childFactory(context, instanceId)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
new file mode 100644
index 0000000..8f2227f
--- /dev/null
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import scala.concurrent.Future
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import whisk.common.{Logging, TransactionId}
+import whisk.core.WhiskConfig
+import whisk.core.connector._
+import whisk.core.entity._
+import whisk.spi.Spi
+
+/**
+ * Describes an abstract invoker. An invoker is a local container pool manager that
+ * is in charge of the container life cycle management.
+ *
+ * @param id a unique instance identifier for the invoker
+ * @param status it status (healthy, unhealthy, offline)
+ */
+class InvokerHealth(val id: InstanceId, val status: InvokerState) {
+ override def equals(obj: scala.Any): Boolean = obj match {
+ case that: InvokerHealth => that.id == this.id && that.status == this.status
+ case _ => false
+ }
+}
+
+trait LoadBalancer {
+
+ /**
+ * Publishes activation message on internal bus for an invoker to pick up.
+ *
+ * @param action the action to invoke
+ * @param msg the activation message to publish on an invoker topic
+ * @param transid the transaction id for the request
+ * @return result a nested Future the outer indicating completion of publishing and
+ * the inner the completion of the action (i.e., the result)
+ * if it is ready before timeout (Right) otherwise the activation id (Left).
+ * The future is guaranteed to complete within the declared action time limit
+ * plus a grace period (see activeAckTimeoutGrace).
+ */
+ def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
+ implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
+
+ /**
+ * Returns a message indicating the health of the containers and/or container pool in general.
+ *
+ * @return a Future[IndexedSeq[InvokerHealth]] representing the health of the pools managed by the loadbalancer.
+ */
+ def invokerHealth(): Future[IndexedSeq[InvokerHealth]]
+
+ /** Gets the number of in-flight activations for a specific user. */
+ def activeActivationsFor(namespace: UUID): Future[Int]
+
+ /** Gets the number of in-flight activations in the system. */
+ def totalActiveActivations: Future[Int]
+}
+
+/**
+ * An Spi for providing load balancer implementations.
+ */
+trait LoadBalancerProvider extends Spi {
+ def requiredProperties: Map[String, String]
+
+ def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): LoadBalancer
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index f0c9fe5..7344ba7 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -200,4 +200,5 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
} getOrElse Future.failed(new IllegalArgumentException("Unit test does not need fast path"))
}
+ override def invokerHealth() = Future.successful(IndexedSeq.empty)
}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
similarity index 68%
rename from tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
rename to tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
index 3f3dca4..60eda84 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
@@ -21,7 +21,7 @@ import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.FlatSpec
import org.scalatest.Matchers
-import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.ContainerPoolBalancer
import whisk.core.loadBalancer.Healthy
import whisk.core.loadBalancer.Offline
import whisk.core.loadBalancer.UnHealthy
@@ -34,12 +34,12 @@ import whisk.core.entity.InstanceId
* of the ContainerPool object.
*/
@RunWith(classOf[JUnitRunner])
-class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
+class ContainerPoolBalancerObjectTests extends FlatSpec with Matchers {
behavior of "memoize"
it should "not recompute a value which was already given" in {
var calls = 0
- val add1: Int => Int = LoadBalancerService.memoize {
+ val add1: Int => Int = ContainerPoolBalancer.memoize {
case second =>
calls += 1
1 + second
@@ -58,18 +58,18 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
behavior of "pairwiseCoprimeNumbersUntil"
it should "return an empty set for malformed inputs" in {
- LoadBalancerService.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
- LoadBalancerService.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
}
it should "return all coprime numbers until the number given" in {
- LoadBalancerService.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
}
behavior of "chooseInvoker"
@@ -78,24 +78,24 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
it should "return None on an empty invokers list" in {
- LoadBalancerService.schedule(IndexedSeq(), 0, 1) shouldBe None
+ ContainerPoolBalancer.schedule(IndexedSeq(), 0, 1) shouldBe None
}
it should "return None on a list of offline/unhealthy invokers" in {
val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0))
- LoadBalancerService.schedule(invs, 0, 1) shouldBe None
+ ContainerPoolBalancer.schedule(invs, 0, 1) shouldBe None
}
it should "schedule to the home invoker" in {
val invs = invokers(10)
val hash = 2
- LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size))
+ ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size))
}
it should "take the only online invoker" in {
- LoadBalancerService.schedule(
+ ContainerPoolBalancer.schedule(
IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)),
0,
1) shouldBe Some(InstanceId(2))
@@ -105,7 +105,7 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
val hash = 0
val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0))
- LoadBalancerService.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
+ ContainerPoolBalancer.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
}
it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in {
@@ -114,9 +114,9 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
val targetInvoker = hash % invokerCount
val invs = invokers(invokerCount).updated(targetInvoker, (InstanceId(targetInvoker), Healthy, 1))
- val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash)
+ val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash)
- LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size))
+ ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size))
}
it should "wrap the search at the end of the invoker list" in {
@@ -125,12 +125,12 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
val hash = 1
val targetInvoker = hashInto(invs, hash) // will be invoker1
- val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2
+ val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2
step shouldBe 2
// invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target
// invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded
- LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size))
+ ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size))
}
it should "multiply its threshold in 3 iterations to find an invoker with a good warm-chance" in {
@@ -140,22 +140,22 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
// even though invoker1 is not the home invoker in this case, it gets chosen over
// the others because it's the first one encountered by the iteration mechanism to be below
// the threshold of 3 * 16 invocations
- LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+ ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
}
it should "choose the home invoker if all invokers are overloaded even above the muliplied threshold" in {
val invs = IndexedSeq((InstanceId(0), Healthy, 51), (InstanceId(1), Healthy, 50), (InstanceId(2), Healthy, 49))
val hash = 0 // home is 0, stepsize is 1
- LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+ ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
}
it should "transparently work with partitioned sets of invokers" in {
val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4), Healthy, 0), (InstanceId(5), Healthy, 0))
- LoadBalancerService.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
- LoadBalancerService.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
- LoadBalancerService.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
- LoadBalancerService.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
+ ContainerPoolBalancer.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
+ ContainerPoolBalancer.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
+ ContainerPoolBalancer.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
+ ContainerPoolBalancer.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
}
}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 8e7d291..6f0d05c 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -68,6 +68,7 @@ import whisk.core.loadBalancer.InvokerPool
import whisk.core.loadBalancer.InvokerState
import whisk.core.loadBalancer.Offline
import whisk.core.loadBalancer.UnHealthy
+import whisk.core.loadBalancer.InvokerHealth
import whisk.utils.retry
import whisk.core.connector.test.TestConnector
import whisk.core.entitlement.Privilege
@@ -101,7 +102,7 @@ class InvokerSupervisionTests
/** Helper to generate a list of (InstanceId, InvokerState) */
def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map {
- case (state, index) => (InstanceId(index), state)
+ case (state, index) => new InvokerHealth(InstanceId(index), state)
}
val pC = new TestConnector("pingFeedTtest", 4, false) {}
--
To stop receiving notification emails like this one, please contact
tysonnorris@apache.org.