You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2018/01/23 00:48:13 UTC

[incubator-openwhisk] branch master updated: Dynamic LoadBalancer load using SpiLoader (#2984)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 113a915  Dynamic LoadBalancer load using SpiLoader (#2984)
113a915 is described below

commit 113a91500839dd90dfe4ddc74776bd95cad69ec6
Author: kpavel <kp...@il.ibm.com>
AuthorDate: Tue Jan 23 02:48:11 2018 +0200

    Dynamic LoadBalancer load using SpiLoader (#2984)
    
    * Add a container pool/invoker loadbalancer SPI.
    
    * Refactor invoker health tuple to proper type for cleaner SPI interface.
    
    Rename LoadBalancerService to ContainerPoolBalancer.
    Split into two files.
---
 common/scala/src/main/resources/reference.conf     |   1 +
 .../scala/whisk/core/controller/Controller.scala   |  16 +--
 .../scala/whisk/core/controller/RestAPIs.scala     |  14 +--
 ...erService.scala => ContainerPoolBalancer.scala} | 124 ++++++++-------------
 .../core/loadBalancer/InvokerSupervision.scala     |  10 +-
 .../whisk/core/loadBalancer/LoadBalancer.scala     |  83 ++++++++++++++
 .../controller/test/ControllerTestCommon.scala     |   1 +
 ...cala => ContainerPoolBalancerObjectTests.scala} |  54 ++++-----
 .../test/InvokerSupervisionTests.scala             |   3 +-
 9 files changed, 180 insertions(+), 126 deletions(-)

diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 45543e5..4530aef 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -3,4 +3,5 @@ whisk.spi{
   MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
   ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider
   LogStoreProvider = whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
+  LoadBalancerProvider = whisk.core.loadBalancer.ContainerPoolBalancer
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index ba58452..53408c3 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -46,7 +46,7 @@ import whisk.core.entitlement._
 import whisk.core.entity._
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity.ExecManifest.Runtimes
-import whisk.core.loadBalancer.{LoadBalancerService}
+import whisk.core.loadBalancer.LoadBalancerProvider
 import whisk.http.BasicHttpService
 import whisk.http.BasicRasService
 import whisk.spi.SpiLoader
@@ -117,7 +117,8 @@ class Controller(val instance: InstanceId,
   })
 
   // initialize backend services
-  private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore)
+  private implicit val loadBalancer =
+    SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
   private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer)
   private implicit val activationIdFactory = new ActivationIdGenerator {}
   private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem)
@@ -137,12 +138,13 @@ class Controller(val instance: InstanceId,
    */
   private val internalInvokerHealth = {
     implicit val executionContext = actorSystem.dispatcher
-
     (path("invokers") & get) {
       complete {
-        loadBalancer.allInvokers.map(_.map {
-          case (instance, state) => s"invoker${instance.toInt}" -> state.asString
-        }.toMap.toJson.asJsObject)
+        loadBalancer
+          .invokerHealth()
+          .map(_.map {
+            case i => s"invoker${i.id.toInt}" -> i.status.asString
+          }.toMap.toJson.asJsObject)
       }
     }
   }
@@ -163,7 +165,7 @@ object Controller {
     Map(WhiskConfig.controllerInstances -> null) ++
       ExecManifest.requiredProperties ++
       RestApiCommons.requiredProperties ++
-      LoadBalancerService.requiredProperties ++
+      SpiLoader.get[LoadBalancerProvider].requiredProperties ++
       EntitlementProvider.requiredProperties
 
   private def info(config: WhiskConfig, runtimes: Runtimes, apis: List[String]) =
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index ffcf01b..161ef4b 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -46,7 +46,7 @@ import whisk.core.entity._
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity.WhiskAuthStore
 import whisk.core.entity.types._
-import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.LoadBalancer
 import whisk.http.Messages
 
 /**
@@ -161,7 +161,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
   implicit val entityStore: EntityStore,
   implicit val entitlementProvider: EntitlementProvider,
   implicit val activationIdFactory: ActivationIdGenerator,
-  implicit val loadBalancer: LoadBalancerService,
+  implicit val loadBalancer: LoadBalancer,
   implicit val cacheChangeNotification: Some[CacheChangeNotification],
   implicit val activationStore: ActivationStore,
   implicit val logStore: LogStore,
@@ -243,7 +243,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     override val activationStore: ActivationStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val cacheChangeNotification: Some[CacheChangeNotification],
     override val executionContext: ExecutionContext,
     override val logging: Logging,
@@ -266,7 +266,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     implicit override val entityStore: EntityStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val cacheChangeNotification: Some[CacheChangeNotification],
     override val executionContext: ExecutionContext,
     override val logging: Logging,
@@ -279,7 +279,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     override val entityStore: EntityStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val cacheChangeNotification: Some[CacheChangeNotification],
     override val executionContext: ExecutionContext,
     override val logging: Logging,
@@ -293,7 +293,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     override val entitlementProvider: EntitlementProvider,
     override val activationStore: ActivationStore,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val cacheChangeNotification: Some[CacheChangeNotification],
     override val executionContext: ExecutionContext,
     override val logging: Logging,
@@ -310,7 +310,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
     override val activationStore: ActivationStore,
     override val entitlementProvider: EntitlementProvider,
     override val activationIdFactory: ActivationIdGenerator,
-    override val loadBalancer: LoadBalancerService,
+    override val loadBalancer: LoadBalancer,
     override val actorSystem: ActorSystem,
     override val executionContext: ExecutionContext,
     override val logging: Logging,
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
similarity index 82%
rename from core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
rename to core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index 89ba900..ffd831d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -19,82 +19,43 @@ package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
 
-import scala.annotation.tailrec
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-import org.apache.kafka.clients.producer.RecordMetadata
-import akka.actor.ActorRefFactory
-import akka.actor.ActorSystem
-import akka.actor.Props
+import akka.actor.{ActorRefFactory, ActorSystem, Props}
 import akka.cluster.Cluster
-import akka.util.Timeout
 import akka.pattern.ask
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
-import whisk.core.ConfigKeys
-import whisk.core.WhiskConfig
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import org.apache.kafka.clients.producer.RecordMetadata
+import pureconfig._
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.core.WhiskConfig._
-import whisk.core.connector.{ActivationMessage, CompletionMessage}
-import whisk.core.connector.MessageFeed
-import whisk.core.connector.MessageProducer
-import whisk.core.connector.MessagingProvider
+import whisk.core.connector._
 import whisk.core.database.NoDocumentException
 import whisk.core.entity._
-import whisk.core.entity.{ActivationId, WhiskActivation}
-import whisk.core.entity.EntityName
-import whisk.core.entity.ExecutableWhiskActionMetaData
-import whisk.core.entity.Identity
-import whisk.core.entity.InstanceId
-import whisk.core.entity.UUID
-import whisk.core.entity.WhiskAction
 import whisk.core.entity.types.EntityStore
+import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.spi.SpiLoader
-import pureconfig._
 
-case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
-
-trait LoadBalancer {
-
-  val activeAckTimeoutGrace = 1.minute
-
-  /** Gets the number of in-flight activations for a specific user. */
-  def activeActivationsFor(namespace: UUID): Future[Int]
-
-  /** Gets the number of in-flight activations in the system. */
-  def totalActiveActivations: Future[Int]
-
-  /**
-   * Publishes activation message on internal bus for an invoker to pick up.
-   *
-   * @param action the action to invoke
-   * @param msg the activation message to publish on an invoker topic
-   * @param transid the transaction id for the request
-   * @return result a nested Future the outer indicating completion of publishing and
-   *         the inner the completion of the action (i.e., the result)
-   *         if it is ready before timeout (Right) otherwise the activation id (Left).
-   *         The future is guaranteed to complete within the declared action time limit
-   *         plus a grace period (see activeAckTimeoutGrace).
-   */
-  def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
-    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
 
-}
+case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
 
-class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)(
-  implicit val actorSystem: ActorSystem,
-  logging: Logging)
+class ContainerPoolBalancer(config: WhiskConfig, instance: InstanceId)(implicit val actorSystem: ActorSystem,
+                                                                       logging: Logging,
+                                                                       materializer: ActorMaterializer)
     extends LoadBalancer {
 
   private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
 
+  /** Used to manage an action for testing invoker health */ /** Used to manage an action for testing invoker health */
+  private val entityStore = WhiskEntityStore.datastore(config)
+
   /** The execution context for futures */
-  implicit val executionContext: ExecutionContext = actorSystem.dispatcher
+  private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
+
+  private val activeAckTimeoutGrace = 1.minute
 
   /** How many invokers are dedicated to blackbox images.  We range bound to something sensical regardless of configuration. */
   private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
@@ -113,10 +74,6 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
     }
   }
 
-  override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
-
-  override def totalActiveActivations = loadBalancerData.totalActivationCount
-
   override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
     implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
     chooseInvoker(msg.user, action).flatMap { invokerName =>
@@ -127,11 +84,16 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
     }
   }
 
-  /** An indexed sequence of all invokers in the current system */
-  def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] =
+  /** An indexed sequence of all invokers in the current system. */
+  override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = {
     invokerPool
       .ask(GetStatus)(Timeout(5.seconds))
-      .mapTo[IndexedSeq[(InstanceId, InvokerState)]]
+      .mapTo[IndexedSeq[InvokerHealth]]
+  }
+
+  override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
+
+  override def totalActiveActivations = loadBalancerData.totalActivationCount
 
   /**
    * Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives.
@@ -307,9 +269,8 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
   /** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */
   private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt)
 
-  /** Return invokers (almost) dedicated to running blackbox actions. */
-  private def blackboxInvokers(
-    invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId, InvokerState)] = {
+  /** Return invokers dedicated to running blackbox actions. */
+  private def blackboxInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
     val blackboxes = numBlackbox(invokers.size)
     invokers.takeRight(blackboxes)
   }
@@ -318,8 +279,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
    * Return (at least one) invokers for running non black-box actions.
    * This set can overlap with the blackbox set if there is only one invoker.
    */
-  private def managedInvokers(
-    invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId, InvokerState)] = {
+  private def managedInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
     val managed = Math.max(1, invokers.length - numBlackbox(invokers.length))
     invokers.take(managed)
   }
@@ -329,14 +289,14 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
     val hash = generateHash(user.namespace, action)
 
     loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
-      allInvokers.flatMap { invokers =>
+      invokerHealth().flatMap { invokers =>
         val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
         val invokersWithUsage = invokersToUse.view.map {
           // Using a view defers the comparably expensive lookup to actual access of the element
-          case (instance, state) => (instance, state, currentActivations.getOrElse(instance.toString, 0))
+          case invoker => (invoker.id, invoker.status, currentActivations.getOrElse(instance.toString, 0))
         }
 
-        LoadBalancerService.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
+        ContainerPoolBalancer.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
           case Some(invoker) => Future.successful(invoker)
           case None =>
             logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
@@ -352,7 +312,13 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
   }
 }
 
-object LoadBalancerService {
+object ContainerPoolBalancer extends LoadBalancerProvider {
+
+  override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): LoadBalancer = new ContainerPoolBalancer(whiskConfig, instance)
+
   def requiredProperties =
     kafkaHosts ++
       Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
@@ -367,7 +333,7 @@ object LoadBalancerService {
   def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
 
   /** Returns pairwise coprime numbers until x. Result is memoized. */
-  val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = LoadBalancerService.memoize {
+  val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = ContainerPoolBalancer.memoize {
     case x =>
       (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
         if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
@@ -394,7 +360,7 @@ object LoadBalancerService {
     val numInvokers = invokers.size
     if (numInvokers > 0) {
       val homeInvoker = hash % numInvokers
-      val stepSizes = LoadBalancerService.pairwiseCoprimeNumbersUntil(numInvokers)
+      val stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(numInvokers)
       val step = stepSizes(hash % stepSizes.size)
 
       val invokerProgression = Stream
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index cd80a8e..13c3a70 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -86,7 +86,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
   // from leaking the state for external mutation
   var instanceToRef = immutable.Map.empty[InstanceId, ActorRef]
   var refToInstance = immutable.Map.empty[ActorRef, InstanceId]
-  var status = IndexedSeq[(InstanceId, InvokerState)]()
+  var status = IndexedSeq[InvokerHealth]()
 
   def receive = {
     case p: PingMessage =>
@@ -103,13 +103,13 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
 
     case CurrentState(invoker, currentState: InvokerState) =>
       refToInstance.get(invoker).foreach { instance =>
-        status = status.updated(instance.toInt, (instance, currentState))
+        status = status.updated(instance.toInt, new InvokerHealth(instance, currentState))
       }
       logStatus()
 
     case Transition(invoker, oldState: InvokerState, newState: InvokerState) =>
       refToInstance.get(invoker).foreach { instance =>
-        status = status.updated(instance.toInt, (instance, newState))
+        status = status.updated(instance.toInt, new InvokerHealth(instance, newState))
       }
       logStatus()
 
@@ -118,7 +118,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
   }
 
   def logStatus() = {
-    val pretty = status.map { case (instance, state) => s"${instance.toInt} -> $state" }
+    val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}")
     logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
   }
 
@@ -155,7 +155,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
   def registerInvoker(instanceId: InstanceId): ActorRef = {
     logging.info(this, s"registered a new invoker: invoker${instanceId.toInt}")(TransactionId.invokerHealth)
 
-    status = padToIndexed(status, instanceId.toInt + 1, i => (InstanceId(i), Offline))
+    status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InstanceId(i), Offline))
 
     val ref = childFactory(context, instanceId)
 
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
new file mode 100644
index 0000000..8f2227f
--- /dev/null
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import scala.concurrent.Future
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import whisk.common.{Logging, TransactionId}
+import whisk.core.WhiskConfig
+import whisk.core.connector._
+import whisk.core.entity._
+import whisk.spi.Spi
+
+/**
+ * Describes an abstract invoker. An invoker is a local container pool manager that
+ * is in charge of the container life cycle management.
+ *
+ * @param id a unique instance identifier for the invoker
+ * @param status it status (healthy, unhealthy, offline)
+ */
+class InvokerHealth(val id: InstanceId, val status: InvokerState) {
+  override def equals(obj: scala.Any): Boolean = obj match {
+    case that: InvokerHealth => that.id == this.id && that.status == this.status
+    case _                   => false
+  }
+}
+
+trait LoadBalancer {
+
+  /**
+   * Publishes activation message on internal bus for an invoker to pick up.
+   *
+   * @param action the action to invoke
+   * @param msg the activation message to publish on an invoker topic
+   * @param transid the transaction id for the request
+   * @return result a nested Future the outer indicating completion of publishing and
+   *         the inner the completion of the action (i.e., the result)
+   *         if it is ready before timeout (Right) otherwise the activation id (Left).
+   *         The future is guaranteed to complete within the declared action time limit
+   *         plus a grace period (see activeAckTimeoutGrace).
+   */
+  def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
+    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
+
+  /**
+   * Returns a message indicating the health of the containers and/or container pool in general.
+   *
+   * @return a Future[IndexedSeq[InvokerHealth]] representing the health of the pools managed by the loadbalancer.
+   */
+  def invokerHealth(): Future[IndexedSeq[InvokerHealth]]
+
+  /** Gets the number of in-flight activations for a specific user. */
+  def activeActivationsFor(namespace: UUID): Future[Int]
+
+  /** Gets the number of in-flight activations in the system. */
+  def totalActiveActivations: Future[Int]
+}
+
+/**
+ * An Spi for providing load balancer implementations.
+ */
+trait LoadBalancerProvider extends Spi {
+  def requiredProperties: Map[String, String]
+
+  def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(implicit actorSystem: ActorSystem,
+                                                                   logging: Logging,
+                                                                   materializer: ActorMaterializer): LoadBalancer
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index f0c9fe5..7344ba7 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -200,4 +200,5 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
       } getOrElse Future.failed(new IllegalArgumentException("Unit test does not need fast path"))
     }
 
+  override def invokerHealth() = Future.successful(IndexedSeq.empty)
 }
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
similarity index 68%
rename from tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
rename to tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
index 3f3dca4..60eda84 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
@@ -21,7 +21,7 @@ import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
-import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.ContainerPoolBalancer
 import whisk.core.loadBalancer.Healthy
 import whisk.core.loadBalancer.Offline
 import whisk.core.loadBalancer.UnHealthy
@@ -34,12 +34,12 @@ import whisk.core.entity.InstanceId
  * of the ContainerPool object.
  */
 @RunWith(classOf[JUnitRunner])
-class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
+class ContainerPoolBalancerObjectTests extends FlatSpec with Matchers {
   behavior of "memoize"
 
   it should "not recompute a value which was already given" in {
     var calls = 0
-    val add1: Int => Int = LoadBalancerService.memoize {
+    val add1: Int => Int = ContainerPoolBalancer.memoize {
       case second =>
         calls += 1
         1 + second
@@ -58,18 +58,18 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
   behavior of "pairwiseCoprimeNumbersUntil"
 
   it should "return an empty set for malformed inputs" in {
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
   }
 
   it should "return all coprime numbers until the number given" in {
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
-    LoadBalancerService.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
+    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
   }
 
   behavior of "chooseInvoker"
@@ -78,24 +78,24 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
   def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
 
   it should "return None on an empty invokers list" in {
-    LoadBalancerService.schedule(IndexedSeq(), 0, 1) shouldBe None
+    ContainerPoolBalancer.schedule(IndexedSeq(), 0, 1) shouldBe None
   }
 
   it should "return None on a list of offline/unhealthy invokers" in {
     val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0))
 
-    LoadBalancerService.schedule(invs, 0, 1) shouldBe None
+    ContainerPoolBalancer.schedule(invs, 0, 1) shouldBe None
   }
 
   it should "schedule to the home invoker" in {
     val invs = invokers(10)
     val hash = 2
 
-    LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size))
+    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size))
   }
 
   it should "take the only online invoker" in {
-    LoadBalancerService.schedule(
+    ContainerPoolBalancer.schedule(
       IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)),
       0,
       1) shouldBe Some(InstanceId(2))
@@ -105,7 +105,7 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
     val hash = 0
     val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0))
 
-    LoadBalancerService.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
+    ContainerPoolBalancer.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
   }
 
   it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in {
@@ -114,9 +114,9 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
     val targetInvoker = hash % invokerCount
 
     val invs = invokers(invokerCount).updated(targetInvoker, (InstanceId(targetInvoker), Healthy, 1))
-    val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash)
+    val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash)
 
-    LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size))
+    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size))
   }
 
   it should "wrap the search at the end of the invoker list" in {
@@ -125,12 +125,12 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
     val hash = 1
 
     val targetInvoker = hashInto(invs, hash) // will be invoker1
-    val step = hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2
+    val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2
     step shouldBe 2
 
     // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target
     // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded
-    LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size))
+    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size))
   }
 
   it should "multiply its threshold in 3 iterations to find an invoker with a good warm-chance" in {
@@ -140,22 +140,22 @@ class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
     // even though invoker1 is not the home invoker in this case, it gets chosen over
     // the others because it's the first one encountered by the iteration mechanism to be below
     // the threshold of 3 * 16 invocations
-    LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+    ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
   }
 
   it should "choose the home invoker if all invokers are overloaded even above the muliplied threshold" in {
     val invs = IndexedSeq((InstanceId(0), Healthy, 51), (InstanceId(1), Healthy, 50), (InstanceId(2), Healthy, 49))
     val hash = 0 // home is 0, stepsize is 1
 
-    LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+    ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
   }
 
   it should "transparently work with partitioned sets of invokers" in {
     val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4), Healthy, 0), (InstanceId(5), Healthy, 0))
 
-    LoadBalancerService.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
-    LoadBalancerService.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
-    LoadBalancerService.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
-    LoadBalancerService.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
+    ContainerPoolBalancer.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
+    ContainerPoolBalancer.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
+    ContainerPoolBalancer.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
+    ContainerPoolBalancer.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
   }
 }
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 8e7d291..6f0d05c 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -68,6 +68,7 @@ import whisk.core.loadBalancer.InvokerPool
 import whisk.core.loadBalancer.InvokerState
 import whisk.core.loadBalancer.Offline
 import whisk.core.loadBalancer.UnHealthy
+import whisk.core.loadBalancer.InvokerHealth
 import whisk.utils.retry
 import whisk.core.connector.test.TestConnector
 import whisk.core.entitlement.Privilege
@@ -101,7 +102,7 @@ class InvokerSupervisionTests
 
   /** Helper to generate a list of (InstanceId, InvokerState) */
   def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map {
-    case (state, index) => (InstanceId(index), state)
+    case (state, index) => new InvokerHealth(InstanceId(index), state)
   }
 
   val pC = new TestConnector("pingFeedTtest", 4, false) {}

-- 
To stop receiving notification emails like this one, please contact
tysonnorris@apache.org.