You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/14 07:12:31 UTC

[GitHub] cbickel closed pull request #3240: Add a loadbalancer with local state and horizontal invoker sharding.

cbickel closed pull request #3240: Add a loadbalancer with local state and horizontal invoker sharding.
URL: https://github.com/apache/incubator-openwhisk/pull/3240
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 6978fb97ac..2041e6f673 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -64,6 +64,8 @@ controller:
       seedNodes: "{{ groups['controllers'] | map('extract', hostvars, 'ansible_host') | list }}"
   # We recommend to enable HA for the controllers only, if bookkeeping data are shared too. (localBookkeeping: false)
   ha: "{{ controller_enable_ha | default(True) and groups['controllers'] | length > 1 }}"
+  loadbalancer:
+    spi: "{{ controller_loadbalancer_spi | default('') }}"
   loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
 
 jmx:
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 8601a2985d..86f5a14d1b 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -152,6 +152,7 @@
       "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
 
       "CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
+      "CONFIG_whisk_spi_LoadBalancerProvider": "{{ controller.loadbalancer.spi }}"
       
       "CONFIG_logback_log_level": "{{ controller.loglevel }}"
 
diff --git a/common/scala/src/main/scala/whisk/common/ForcableSemaphore.scala b/common/scala/src/main/scala/whisk/common/ForcableSemaphore.scala
new file mode 100644
index 0000000000..9544a3079c
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/ForcableSemaphore.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import java.util.concurrent.locks.AbstractQueuedSynchronizer
+
+import scala.annotation.tailrec
+
+/**
+ * A Semaphore, which in addition to the usual features has means to force more clients to get permits.
+ *
+ * Like any usual Semaphore, this implementation will give away at most `maxAllowed` permits when used the "usual" way.
+ * In addition to that, it also has a `forceAcquire` method which will push the Semaphore's remaining permits into a
+ * negative value. Getting permits using `tryAcquire` will only be possible once the permits value is in a positive
+ * state again.
+ *
+ * As this is (now) only used for the loadbalancer's scheduling, this does not implement the "whole" Java Semaphore's
+ * interface but only the methods needed.
+ *
+ * @param maxAllowed maximum number of permits given away by `tryAcquire`
+ */
+class ForcableSemaphore(maxAllowed: Int) {
+  class Sync extends AbstractQueuedSynchronizer {
+    setState(maxAllowed)
+
+    def permits: Int = getState
+
+    /** Try to release a permit and return whether or not that operation was successful. */
+    @tailrec
+    override final def tryReleaseShared(releases: Int): Boolean = {
+      val current = getState
+      val next = current + releases
+      if (next < current) { // integer overflow
+        throw new Error("Maximum permit count exceeded, permit variable overflowed")
+      }
+      if (compareAndSetState(current, next)) {
+        true
+      } else {
+        tryReleaseShared(releases)
+      }
+    }
+
+    /**
+     * Try to acquire a permit and return whether or not that operation was successful. Requests may not finish in FIFO
+     * order, hence this method is not necessarily fair.
+     */
+    @tailrec
+    final def nonFairTryAcquireShared(acquires: Int): Int = {
+      val available = getState
+      val remaining = available - acquires
+      if (remaining < 0 || compareAndSetState(available, remaining)) {
+        remaining
+      } else {
+        nonFairTryAcquireShared(acquires)
+      }
+    }
+
+    /**
+     * Basically the same as `nonFairTryAcquireShared`, but does bound to a minimal value of 0 so permits can get
+     * negative.
+     */
+    @tailrec
+    final def forceAquireShared(acquires: Int): Unit = {
+      val available = getState
+      val remaining = available - acquires
+      if (!compareAndSetState(available, remaining)) {
+        forceAquireShared(acquires)
+      }
+    }
+  }
+
+  val sync = new Sync
+
+  /**
+   * Acquires the given numbers of permits.
+   *
+   * @param acquires the number of permits to get
+   * @return `true`, iff the internal semaphore's number of permits is positive, `false` if negative
+   */
+  def tryAcquire(acquires: Int = 1): Boolean = {
+    require(acquires > 0, "cannot acquire negative or no permits")
+    sync.nonFairTryAcquireShared(acquires) >= 0
+  }
+
+  /**
+   * Forces the amount of permits.
+   *
+   * This possibly pushes the internal number of available permits to a negative value.
+   *
+   * @param acquires the number of permits to get
+   */
+  def forceAcquire(acquires: Int = 1): Unit = {
+    require(acquires > 0, "cannot force acquire negative or no permits")
+    sync.forceAquireShared(acquires)
+  }
+
+  /**
+   * Releases the given amount of permits
+   *
+   * @param acquires the number of permits to release
+   */
+  def release(acquires: Int = 1): Unit = {
+    require(acquires > 0, "cannot release negative or no permits")
+    sync.releaseShared(acquires)
+  }
+
+  /** Returns the number of currently available permits. Possibly negative. */
+  def availablePermits: Int = sync.permits
+}
diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 2ccbf2f642..00901bf2b2 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -118,6 +118,8 @@ class Controller(val instance: InstanceId,
   // initialize backend services
   private implicit val loadBalancer =
     SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
+  logging.info(this, s"loadbalancer initialized: ${loadBalancer.getClass.getSimpleName}")(TransactionId.controller)
+
   private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer)
   private implicit val activationIdFactory = new ActivationIdGenerator {}
   private implicit val logStore = SpiLoader.get[LogStoreProvider].logStore(actorSystem)
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 1a787e5b55..563e7fe0dc 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -26,32 +26,28 @@ import whisk.http.Messages
 import scala.concurrent.{ExecutionContext, Future}
 
 /**
- * Determines user limits and activation counts as seen by the invoker and the loadbalancer
- * in a scheduled, repeating task for other services to get the cached information to be able
- * to calculate and determine whether the namespace currently invoking a new action should
- * be allowed to do so.
+ * Determine whether the namespace currently invoking a new action should be allowed to do so.
  *
- * @param loadbalancer contains active quotas
- * @param defaultConcurrencyLimit the default max allowed concurrent operations
+ * @param loadBalancer contains active quotas
+ * @param concurrencyLimit a calculated limit relative to the user using the system
  * @param systemOverloadLimit the limit when the system is considered overloaded
  */
-class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)(
+class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity => Int, systemOverloadLimit: Int)(
   implicit logging: Logging,
   executionContext: ExecutionContext) {
 
-  logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit")(
-    TransactionId.controller)
+  logging.info(this, s"systemOverloadLimit = $systemOverloadLimit")(TransactionId.controller)
 
   /**
    * Checks whether the operation should be allowed to proceed.
    */
   def check(user: Identity)(implicit tid: TransactionId): Future[RateLimit] = {
     loadBalancer.activeActivationsFor(user.uuid).map { concurrentActivations =>
-      val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
+      val currentLimit = concurrencyLimit(user)
       logging.debug(
         this,
-        s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
-      ConcurrentRateLimit(concurrentActivations, concurrencyLimit)
+        s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $currentLimit")
+      ConcurrentRateLimit(concurrentActivations, currentLimit)
     }
   }
 
@@ -76,11 +72,11 @@ sealed trait RateLimit {
 }
 
 case class ConcurrentRateLimit(count: Int, allowed: Int) extends RateLimit {
-  val ok = count < allowed // must have slack for the current activation request
-  override def errorMsg = Messages.tooManyConcurrentRequests(count, allowed)
+  val ok: Boolean = count < allowed // must have slack for the current activation request
+  override def errorMsg: String = Messages.tooManyConcurrentRequests(count, allowed)
 }
 
 case class TimedRateLimit(count: Int, allowed: Int) extends RateLimit {
-  val ok = count <= allowed // the count is already updated to account for the current request
-  override def errorMsg = Messages.tooManyRequests(count, allowed)
+  val ok: Boolean = count <= allowed // the count is already updated to account for the current request
+  override def errorMsg: String = Messages.tooManyRequests(count, allowed)
 }
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 9f97d719ef..d148eb78fd 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -19,23 +19,20 @@ package whisk.core.entitlement
 
 import scala.collection.concurrent.TrieMap
 import scala.collection.immutable.Set
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.Failure
 import scala.util.Success
-
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.StatusCodes.Forbidden
 import akka.http.scaladsl.model.StatusCodes.TooManyRequests
-
 import whisk.core.entitlement.Privilege.ACTIVATE
-import whisk.core.entitlement.Privilege._
 import whisk.core.entitlement.Privilege.REJECT
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.controller.RejectRequest
 import whisk.core.entity._
-import whisk.core.loadBalancer.LoadBalancer
+import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer}
 import whisk.http.ErrorResponse
 import whisk.http.Messages
 import whisk.http.Messages._
@@ -57,10 +54,10 @@ protected[core] case class Resource(namespace: EntityPath,
                                     collection: Collection,
                                     entity: Option[String],
                                     env: Option[Parameters] = None) {
-  def parent = collection.path + EntityPath.PATHSEP + namespace
-  def id = parent + entity.map(EntityPath.PATHSEP + _).getOrElse("")
-  def fqname = namespace.asString + entity.map(EntityPath.PATHSEP + _).getOrElse("")
-  override def toString = id
+  def parent: String = collection.path + EntityPath.PATHSEP + namespace
+  def id: String = parent + entity.map(EntityPath.PATHSEP + _).getOrElse("")
+  def fqname: String = namespace.asString + entity.map(EntityPath.PATHSEP + _).getOrElse("")
+  override def toString: String = id
 }
 
 protected[core] object EntitlementProvider {
@@ -82,42 +79,69 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
   implicit actorSystem: ActorSystem,
   logging: Logging) {
 
-  private implicit val executionContext = actorSystem.dispatcher
-
-  /**
-   * The number of controllers if HA is enabled, 1 otherwise
-   */
-  private val diviser = if (config.controllerHighAvailability) config.controllerInstances.toInt else 1
+  private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
 
   /**
    * Allows 20% of additional requests on top of the limit to mitigate possible unfair round-robin loadbalancing between
    * controllers
    */
   private val overcommit = if (config.controllerHighAvailability) 1.2 else 1
+  private def dilateLimit(limit: Int): Int = Math.ceil(limit.toDouble * overcommit).toInt
 
   /**
-   * Adjust the throttles for a single controller with the diviser and the overcommit.
+   * Calculates a possibly dilated limit relative to the current user.
    *
-   * @param originalThrottle The throttle that needs to be adjusted for this controller.
+   * @param defaultLimit the default limit across the whole system
+   * @param user the user to apply that limit to
+   * @return a calculated limit
    */
-  private def dilateThrottle(originalThrottle: Int): Int = {
-    Math.ceil((originalThrottle.toDouble / diviser.toDouble) * overcommit).toInt
+  private def calculateLimit(defaultLimit: Int, overrideLimit: Identity => Option[Int])(user: Identity): Int = {
+    val absoluteLimit = overrideLimit(user).getOrElse(defaultLimit)
+    dilateLimit(absoluteLimit)
+  }
+
+  /**
+   * Calculates a limit which applies only to this instance individually.
+   *
+   * The state needed to correctly check this limit is not shared between all instances, which want to check that
+   * limit, so it needs to be divided between the parties who want to perform that check.
+   *
+   * @param defaultLimit the default limit across the whole system
+   * @param user the user to apply that limit to
+   * @return a calculated limit
+   */
+  private def calculateIndividualLimit(defaultLimit: Int, overrideLimit: Identity => Option[Int])(
+    user: Identity): Int = {
+    val limit = calculateLimit(defaultLimit, overrideLimit)(user)
+    if (limit == 0) {
+      0
+    } else {
+      // Edge case: Iff the divided limit is < 1 no loadbalancer would allow an action to be executed, thus we range
+      // bound to at least 1
+      (limit / loadBalancer.clusterSize).max(1)
+    }
   }
 
   private val invokeRateThrottler =
     new RateThrottler(
       "actions per minute",
-      dilateThrottle(config.actionInvokePerMinuteLimit.toInt),
-      _.limits.invocationsPerMinute.map(dilateThrottle))
+      calculateIndividualLimit(config.actionInvokePerMinuteLimit.toInt, _.limits.invocationsPerMinute))
   private val triggerRateThrottler =
     new RateThrottler(
       "triggers per minute",
-      dilateThrottle(config.triggerFirePerMinuteLimit.toInt),
-      _.limits.firesPerMinute.map(dilateThrottle))
-  private val concurrentInvokeThrottler = new ActivationThrottler(
-    loadBalancer,
-    config.actionInvokeConcurrentLimit.toInt,
-    config.actionInvokeSystemOverloadLimit.toInt)
+      calculateIndividualLimit(config.triggerFirePerMinuteLimit.toInt, _.limits.firesPerMinute))
+
+  private val activationThrottleCalculator = loadBalancer match {
+    // This loadbalancer applies sharding and does not share any state
+    case _: ShardingContainerPoolBalancer => calculateIndividualLimit _
+    // Activation relevant data is shared by all other loadbalancers
+    case _ => calculateLimit _
+  }
+  private val concurrentInvokeThrottler =
+    new ActivationThrottler(
+      loadBalancer,
+      activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations),
+      config.actionInvokeSystemOverloadLimit.toInt)
 
   /**
    * Grants a subject the right to access a resources.
@@ -262,7 +286,7 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
     implicit transid: TransactionId): Future[Set[(Resource, Boolean)]] = {
     // check the default namespace first, bypassing additional checks if permitted
     val defaultNamespaces = Set(user.namespace.asString)
-    implicit val es = this
+    implicit val es: EntitlementProvider = this
 
     Future.sequence {
       resources.map { resource =>
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
index fd0b56a836..9ce7216dea 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
@@ -30,10 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger
  *
  * For now, we throttle only at a 1-minute granularity.
  */
-class RateThrottler(description: String, defaultMaxPerMinute: Int, overrideMaxPerMinute: Identity => Option[Int])(
-  implicit logging: Logging) {
-
-  logging.debug(this, s"$description: defaultMaxPerMinute = $defaultMaxPerMinute")(TransactionId.controller)
+class RateThrottler(description: String, maxPerMinute: Identity => Int)(implicit logging: Logging) {
 
   /**
    * Maintains map of subject namespace to operations rates.
@@ -50,7 +47,7 @@ class RateThrottler(description: String, defaultMaxPerMinute: Int, overrideMaxPe
   def check(user: Identity)(implicit transid: TransactionId): RateLimit = {
     val uuid = user.uuid // this is namespace identifier
     val throttle = rateMap.getOrElseUpdate(uuid, new RateInfo)
-    val limit = overrideMaxPerMinute(user).getOrElse(defaultMaxPerMinute)
+    val limit = maxPerMinute(user)
     val rate = TimedRateLimit(throttle.update(limit), limit)
     logging.debug(this, s"namespace = ${uuid.asString} rate = ${rate.count}, limit = $limit")
     rate
@@ -61,7 +58,7 @@ class RateThrottler(description: String, defaultMaxPerMinute: Int, overrideMaxPe
  * Tracks the activation rate of one subject at minute-granularity.
  */
 private class RateInfo {
-  @volatile var lastMin = getCurrentMinute
+  @volatile var lastMin: Long = getCurrentMinute
   val lastMinCount = new AtomicInteger()
 
   /**
@@ -77,7 +74,7 @@ private class RateInfo {
     lastMinCount.incrementAndGet()
   }
 
-  def roll() = {
+  def roll(): Unit = {
     val curMin = getCurrentMinute
     if (curMin != lastMin) {
       lastMin = curMin
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index 46cb15d008..f3c2f0d875 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -92,6 +92,8 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
 
   override def totalActiveActivations = loadBalancerData.totalActivationCount
 
+  override def clusterSize = if (config.controllerHighAvailability) config.controllerInstances.toInt else 1
+
   /**
    * Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives.
    * The promise is removed form the map when the result arrives or upon timeout.
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 0c75176208..1ac686d193 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -53,6 +53,9 @@ case object UnHealthy extends InvokerState { val asString = "unhealthy" }
 case class ActivationRequest(msg: ActivationMessage, invoker: InstanceId)
 case class InvocationFinishedMessage(invokerInstance: InstanceId, successful: Boolean)
 
+// Sent to a monitor if the state changed
+case class CurrentInvokerPoolState(newState: IndexedSeq[InvokerHealth])
+
 // Data stored in the Invoker
 final case class InvokerInfo(buffer: RingBuffer[Boolean])
 
@@ -69,7 +72,8 @@ final case class InvokerInfo(buffer: RingBuffer[Boolean])
  */
 class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
                   sendActivationToInvoker: (ActivationMessage, InstanceId) => Future[RecordMetadata],
-                  pingConsumer: MessageConsumer)
+                  pingConsumer: MessageConsumer,
+                  monitor: Option[ActorRef])
     extends Actor {
 
   implicit val transid = TransactionId.invokerHealth
@@ -113,6 +117,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
   }
 
   def logStatus() = {
+    monitor.foreach(_ ! CurrentInvokerPoolState(status))
     val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}")
     logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
   }
@@ -208,8 +213,9 @@ object InvokerPool {
 
   def props(f: (ActorRefFactory, InstanceId) => ActorRef,
             p: (ActivationMessage, InstanceId) => Future[RecordMetadata],
-            pc: MessageConsumer) = {
-    Props(new InvokerPool(f, p, pc))
+            pc: MessageConsumer,
+            m: Option[ActorRef] = None) = {
+    Props(new InvokerPool(f, p, pc, m))
   }
 
   /** A stub identity for invoking the test action. This does not need to be a valid identity. */
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
index 8f2227feba..ab8db1c589 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -38,6 +38,8 @@ class InvokerHealth(val id: InstanceId, val status: InvokerState) {
     case that: InvokerHealth => that.id == this.id && that.status == this.status
     case _                   => false
   }
+
+  override def toString = s"InvokerHealth($id, $status)"
 }
 
 trait LoadBalancer {
@@ -69,6 +71,9 @@ trait LoadBalancer {
 
   /** Gets the number of in-flight activations in the system. */
   def totalActiveActivations: Future[Int]
+
+  /** Gets the size of the cluster all loadbalancers are acting in */
+  def clusterSize: Int = 1
 }
 
 /**
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
new file mode 100644
index 0000000000..5da863d715
--- /dev/null
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.LongAdder
+import java.util.concurrent.ThreadLocalRandom
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.cluster.ClusterEvent._
+import akka.cluster.{Cluster, Member, MemberStatus}
+import akka.event.Logging.InfoLevel
+import akka.stream.ActorMaterializer
+import org.apache.kafka.clients.producer.RecordMetadata
+import pureconfig._
+import whisk.common._
+import whisk.core.WhiskConfig._
+import whisk.core.connector._
+import whisk.core.entity._
+import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.spi.SpiLoader
+
+import scala.annotation.tailrec
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+/**
+ * A loadbalancer that uses "horizontal" sharding to not collide with fellow loadbalancers.
+ *
+ * Horizontal sharding means, that each invoker's capacity is evenly divided between the loadbalancers. If an invoker
+ * has at most 16 slots available, those will be divided to 8 slots for each loadbalancer (if there are 2).
+ */
+class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)(
+  implicit val actorSystem: ActorSystem,
+  logging: Logging,
+  materializer: ActorMaterializer)
+    extends LoadBalancer {
+
+  private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
+
+  /** Build a cluster of all loadbalancers */
+  val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
+  val cluster = Cluster(actorSystem)
+  cluster.joinSeedNodes(seedNodesProvider.getSeedNodes())
+
+  /** Used to manage an action for testing invoker health */
+  private val entityStore = WhiskEntityStore.datastore(config)
+
+  /** State related to invocations and throttling */
+  private val activations = TrieMap[ActivationId, ActivationEntry]()
+  private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+  private val totalActivations = new LongAdder()
+
+  /** State needed for scheduling. */
+  private val schedulingState = ShardingContainerPoolBalancerState()()
+
+  /**
+   * Monitors invoker supervision and the cluster to update the state sequentially
+   *
+   * All state updates should go through this actor to guarantee, that `updateState` and `updateCluster` are called
+   * mutually exclusive and not concurrently.
+   */
+  private val monitor = actorSystem.actorOf(Props(new Actor {
+    override def preStart(): Unit = {
+      cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
+    }
+
+    // all members of the cluster that are available
+    var availableMembers = Set.empty[Member]
+
+    override def receive: Receive = {
+      case CurrentInvokerPoolState(newState) =>
+        schedulingState.updateInvokers(newState)
+
+      // State of the cluster as it is right now
+      case CurrentClusterState(members, _, _, _, _) =>
+        availableMembers = members.filter(_.status == MemberStatus.Up)
+        schedulingState.updateCluster(availableMembers.size)
+
+      // General lifecycle events and events concerning the reachability of members. Split-brain is not a huge concern
+      // in this case as only the invoker-threshold is adjusted according to the perceived cluster-size.
+      // Taking the unreachable member out of the cluster from that point-of-view results in a better experience
+      // even under split-brain-conditions, as that (in the worst-case) results in premature overloading of invokers vs.
+      // going into overflow mode prematurely.
+      case event: ClusterDomainEvent =>
+        availableMembers = event match {
+          case MemberUp(member)          => availableMembers + member
+          case ReachableMember(member)   => availableMembers + member
+          case MemberRemoved(member, _)  => availableMembers - member
+          case UnreachableMember(member) => availableMembers - member
+          case _                         => availableMembers
+        }
+
+        schedulingState.updateCluster(availableMembers.size)
+    }
+  }))
+
+  /** Loadbalancer interface methods */
+  override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(schedulingState.invokers)
+  override def activeActivationsFor(namespace: UUID): Future[Int] =
+    Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
+  override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue())
+  override def clusterSize: Int = schedulingState.clusterSize
+
+  /** 1. Publish a message to the loadbalancer */
+  override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
+    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
+
+    val invokersToUse = if (!action.exec.pull) schedulingState.managedInvokers else schedulingState.blackboxInvokers
+    val chosen = if (invokersToUse.nonEmpty) {
+      val hash = ShardingContainerPoolBalancer.generateHash(msg.user.namespace, action.fullyQualifiedName(false))
+      val homeInvoker = hash % invokersToUse.size
+      val stepSize = schedulingState.stepSizes(hash % schedulingState.stepSizes.size)
+      ShardingContainerPoolBalancer.schedule(invokersToUse, schedulingState.invokerSlots, homeInvoker, stepSize)
+    } else {
+      None
+    }
+
+    chosen
+      .map { invoker =>
+        val entry = setupActivation(msg, action, invoker)
+        sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
+          entry.promise.future
+        }
+      }
+      .getOrElse(Future.failed(LoadBalancerException("No invokers available")))
+  }
+
+  /** 2. Update local state with the to be executed activation */
+  private def setupActivation(msg: ActivationMessage,
+                              action: ExecutableWhiskActionMetaData,
+                              instance: InstanceId): ActivationEntry = {
+
+    totalActivations.increment()
+    activationsPerNamespace.getOrElseUpdate(msg.user.uuid, new LongAdder()).increment()
+
+    val timeout = action.limits.timeout.duration.max(TimeLimit.STD_DURATION) + 1.minute
+    // Install a timeout handler for the catastrophic case where an active ack is not received at all
+    // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
+    // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
+    // in this case, if the activation handler is still registered, remove it and update the books.
+    activations.getOrElseUpdate(
+      msg.activationId, {
+        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+          processCompletion(Left(msg.activationId), msg.transid, forced = true, invoker = instance)
+        }
+
+        // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
+        ActivationEntry(
+          msg.activationId,
+          msg.user.uuid,
+          instance,
+          timeoutHandler,
+          Promise[Either[ActivationId, WhiskActivation]]())
+      })
+  }
+
+  private val messagingProvider = SpiLoader.get[MessagingProvider]
+  private val messageProducer = messagingProvider.getProducer(config, executionContext)
+
+  /** 3. Send the activation to the invoker */
+  private def sendActivationToInvoker(producer: MessageProducer,
+                                      msg: ActivationMessage,
+                                      invoker: InstanceId): Future[RecordMetadata] = {
+    implicit val transid: TransactionId = msg.transid
+
+    val topic = s"invoker${invoker.toInt}"
+
+    MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START(msg.user.uuid.asString))
+    val start = transid.started(
+      this,
+      LoggingMarkers.CONTROLLER_KAFKA,
+      s"posting topic '$topic' with activation id '${msg.activationId}'",
+      logLevel = InfoLevel)
+
+    producer.send(topic, msg).andThen {
+      case Success(status) =>
+        transid.finished(
+          this,
+          start,
+          s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]",
+          logLevel = InfoLevel)
+      case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic")
+    }
+  }
+
+  /**
+   * Subscribes to active acks (completion messages from the invokers), and
+   * registers a handler for received active acks from invokers.
+   */
+  private val maxActiveAcksPerPoll = 128
+  private val activeAckPollDuration = 1.second
+  private val activeAckConsumer =
+    messagingProvider.getConsumer(
+      config,
+      "completions",
+      s"completed${controllerInstance.toInt}",
+      maxPeek = maxActiveAcksPerPoll)
+
+  private val activationFeed = actorSystem.actorOf(Props {
+    new MessageFeed(
+      "activeack",
+      logging,
+      activeAckConsumer,
+      maxActiveAcksPerPoll,
+      activeAckPollDuration,
+      processActiveAck)
+  })
+
+  /** 4. Get the active-ack message and parse it */
+  private def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
+    val raw = new String(bytes, StandardCharsets.UTF_8)
+    CompletionMessage.parse(raw) match {
+      case Success(m: CompletionMessage) =>
+        processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
+        activationFeed ! MessageFeed.Processed
+
+      case Failure(t) =>
+        activationFeed ! MessageFeed.Processed
+        logging.error(this, s"failed processing message: $raw with $t")
+    }
+  }
+
+  /** 5. Process the active-ack and update the state accordingly */
+  private def processCompletion(response: Either[ActivationId, WhiskActivation],
+                                tid: TransactionId,
+                                forced: Boolean,
+                                invoker: InstanceId): Unit = {
+    val aid = response.fold(l => l, r => r.activationId)
+
+    // treat left as success (as it is the result of a message exceeding the bus limit)
+    val isSuccess = response.fold(l => true, r => !r.response.isWhiskError)
+
+    activations.remove(aid) match {
+      case Some(entry) =>
+        if (!forced) {
+          entry.timeoutHandler.cancel()
+          entry.promise.trySuccess(response)
+        } else {
+          entry.promise.tryFailure(new Throwable("no active ack received"))
+        }
+
+        totalActivations.decrement()
+        activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+        schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
+
+        logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
+        // Active acks that are received here are strictly from user actions - health actions are not part of
+        // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
+        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
+      case None if !forced =>
+        // the entry has already been removed but we receive an active ack for this activation Id.
+        // This happens for health actions, because they don't have an entry in Loadbalancerdata or
+        // for activations that already timed out.
+        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
+        logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
+      case None =>
+        // the entry has already been removed by an active ack. This part of the code is reached by the timeout.
+        // As the active ack is already processed we don't have to do anything here.
+        logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
+    }
+  }
+
+  private val invokerPool = {
+    InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore(config))
+
+    actorSystem.actorOf(
+      InvokerPool.props(
+        (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
+        (m, i) => sendActivationToInvoker(messageProducer, m, i),
+        messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128),
+        Some(monitor)))
+  }
+}
+
+object ShardingContainerPoolBalancer extends LoadBalancerProvider {
+
+  override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): LoadBalancer = new ShardingContainerPoolBalancer(whiskConfig, instance)
+
+  def requiredProperties: Map[String, String] =
+    kafkaHosts ++
+      Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
+
+  def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): Int = {
+    (namespace.asString.hashCode() ^ action.asString.hashCode()).abs
+  }
+
+  /**
+   * Scans through all invokers and searches for an invoker tries to get a free slot on an invoker. If no slot can be
+   * obtained, randomly picks a healthy invoker.
+   *
+   * @param invokers a list of available invokers to search in, including their state
+   * @param dispatched semaphores for each invoker to give the slots away from
+   * @param index the index to start from (initially should be the "homeInvoker"
+   * @param step stable identifier of the entity to be scheduled
+   * @return an invoker to schedule to or None of no invoker is available
+   */
+  @tailrec
+  def schedule(invokers: IndexedSeq[InvokerHealth],
+               dispatched: IndexedSeq[ForcableSemaphore],
+               index: Int,
+               step: Int,
+               stepsDone: Int = 0): Option[InstanceId] = {
+    val numInvokers = invokers.size
+
+    if (numInvokers > 0) {
+      val invoker = invokers(index)
+      // If the current invoker is healthy and we can get a slot
+      if (invoker.status == Healthy && dispatched(invoker.id.toInt).tryAcquire()) {
+        Some(invoker.id)
+      } else {
+        // If we've gone through all invokers
+        if (stepsDone == numInvokers + 1) {
+          val healthyInvokers = invokers.filter(_.status == Healthy)
+          if (healthyInvokers.nonEmpty) {
+            // Choose a healthy invoker randomly
+            val random = ThreadLocalRandom.current().nextInt(healthyInvokers.size)
+            dispatched(random).forceAcquire()
+            Some(healthyInvokers(random).id)
+          } else {
+            None
+          }
+        } else {
+          val newIndex = (index + step) % numInvokers
+          schedule(invokers, dispatched, newIndex, step, stepsDone + 1)
+        }
+      }
+    } else {
+      None
+    }
+  }
+}
+
+/**
+ * Holds the state necessary for scheduling of actions.
+ *
+ * @param _invokers all of the known invokers in the system
+ * @param _managedInvokers all invokers for managed runtimes
+ * @param _blackboxInvokers all invokers for blackbox runtimes
+ * @param _stepSizes the step-sizes possible for the current invoker count
+ * @param _invokerSlots state of accessible slots of each invoker
+ */
+case class ShardingContainerPoolBalancerState(
+  private var _invokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
+  private var _managedInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
+  private var _blackboxInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
+  private var _stepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _invokerSlots: IndexedSeq[ForcableSemaphore] = IndexedSeq.empty[ForcableSemaphore],
+  private var _clusterSize: Int = 1)(
+  lbConfig: ShardingContainerPoolBalancerConfig =
+    loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(implicit logging: Logging) {
+
+  private val totalInvokerThreshold = lbConfig.invokerBusyThreshold
+  private var currentInvokerThreshold = totalInvokerThreshold
+
+  private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
+  logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
+
+  /** Getters for the variables, setting from the outside is only allowed through the update methods below */
+  def invokers: IndexedSeq[InvokerHealth] = _invokers
+  def managedInvokers: IndexedSeq[InvokerHealth] = _managedInvokers
+  def blackboxInvokers: IndexedSeq[InvokerHealth] = _blackboxInvokers
+  def stepSizes: Seq[Int] = _stepSizes
+  def invokerSlots: IndexedSeq[ForcableSemaphore] = _invokerSlots
+  def clusterSize: Int = _clusterSize
+
+  /**
+   * Updates the scheduling state with the new invokers.
+   *
+   * This is okay to not happen atomically since dirty reads of the values set are not dangerous. It is important though
+   * to update the "invokers" variables last, since they will determine the range of invokers to choose from.
+   *
+   * Handling a shrinking invokers list is not necessary, because InvokerPool won't shrink its own list but rather
+   * report the invoker as "Offline".
+   *
+   * It is important that this method does not run concurrently to itself and/or to `updateCluster`
+   */
+  def updateInvokers(newInvokers: IndexedSeq[InvokerHealth]): Unit = {
+    val oldSize = _invokers.size
+    val newSize = newInvokers.size
+
+    if (oldSize != newSize) {
+      _stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(newSize)
+      if (oldSize < newSize) {
+        // Keeps the existing state..
+        _invokerSlots = _invokerSlots.padTo(newSize, new ForcableSemaphore(currentInvokerThreshold))
+      }
+    }
+
+    val blackboxes = Math.max(1, (newSize.toDouble * blackboxFraction).toInt)
+    val managed = Math.max(1, newSize - blackboxes)
+
+    _invokers = newInvokers
+    _blackboxInvokers = _invokers.takeRight(blackboxes)
+    _managedInvokers = _invokers.take(managed)
+
+    logging.info(
+      this,
+      s"loadbalancer invoker status updated. managedInvokers = $managed blackboxInvokers = $blackboxes")(
+      TransactionId.loadbalancer)
+  }
+
+  /**
+   * Updates the size of a cluster. Throws away all state for simplicity.
+   *
+   * This is okay to not happen atomically, since a dirty read of the values set are not dangerous. At worst the
+   * scheduler works on outdated invoker-load data which is acceptable.
+   *
+   * It is important that this method does not run concurrently to itself and/or to `updateState`
+   */
+  def updateCluster(newSize: Int): Unit = {
+    val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls back to a size of 1 (alone)
+    if (_clusterSize != actualSize) {
+      _clusterSize = actualSize
+      val newTreshold = (totalInvokerThreshold / actualSize) max 1 // letting this fall below 1 doesn't make sense
+      currentInvokerThreshold = newTreshold
+      _invokerSlots = _invokerSlots.map(_ => new ForcableSemaphore(currentInvokerThreshold))
+
+      logging.info(
+        this,
+        s"loadbalancer cluster size changed to $actualSize active nodes. invokerThreshold = $currentInvokerThreshold")(
+        TransactionId.loadbalancer)
+    }
+  }
+}
+
+/**
+ * Configuration for the sharding container pool balancer.
+ *
+ * @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes
+ * @param invokerBusyThreshold how many slots an invoker has available in total
+ */
+case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
diff --git a/docs/deploy.md b/docs/deploy.md
index 99bf5ab551..49c2636758 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -17,6 +17,12 @@ How to down the members.
 Link to akka clustering documentation:
 https://doc.akka.io/docs/akka/2.5.4/scala/cluster-usage.html
 
+## Shared state vs. Sharding
+
+OpenWhisk supports both a shared state and a sharding model. By default the shared-state loadbalancer is used. The sharding loadbalancer is the newer implementation and scheduled to eventually supersede the shared-state implementation and become the default. To configure your system to use the sharding implementation, set `controller_loadbalancer_spi` to `whisk.core.loadBalancer.ShardingContainerPoolBalancer`.
+
+The sharding loadbalancer has the caveat of being limited in its scalability in its current implementation. It uses "horizontal" sharding, which means that the slots on each invoker are evenly divided to the loadbalancers. For example: In a system with 2 loadbalancers and invokers which have 16 slots each, each loadbalancer would get 8 slots on each invoker. In this specific case, a cluster of loadbalancers > 16 instances does not make sense, since each loadbalancer would only have a fraction of a slot above that. The code guards against that but it is strongly recommended not to deploy more sharding loadbalancers than there are slots on each invoker.
+
 # Invoker use of docker-runc
 
 To improve performance, Invokers attempt to maintain warm containers for frequently executed actions. To optimize resource usage, the action containers are paused/unpaused between invocations.  The system can be configured to use either docker-runc or docker to perform the pause/unpause operations by setting the value of the environment variable INVOKER_USE_RUNC to true or false respectively. If not set, it will default to true (use docker-runc).
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala b/tests/src/test/scala/limits/ThrottleTests.scala
index dfee5c0a04..bfbc3f69b3 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -188,15 +188,12 @@ class ThrottleTests
     val numGroups = (totalInvokes / maximumConcurrentInvokes) + 1
     val invokesPerGroup = (totalInvokes / numGroups) + 1
     val interGroupSleep = 5.seconds
-    val results = (1 to numGroups)
-      .map { i =>
-        if (i != 1) { Thread.sleep(interGroupSleep.toMillis) }
-        untilThrottled(invokesPerGroup) { () =>
-          wsk.action.invoke(name, Map("payload" -> "testWord".toJson), expectedExitCode = DONTCARE_EXIT)
-        }
+    val results = (1 to numGroups).flatMap { i =>
+      if (i != 1) { Thread.sleep(interGroupSleep.toMillis) }
+      untilThrottled(invokesPerGroup) { () =>
+        wsk.action.invoke(name, Map("payload" -> "testWord".toJson), expectedExitCode = DONTCARE_EXIT)
       }
-      .flatten
-      .toList
+    }.toList
     val afterInvokes = Instant.now
 
     try {
@@ -244,10 +241,12 @@ class ThrottleTests
         action.create(name, timeoutAction)
     }
 
-    // The sleep is necessary as the load balancer currently has a latency before recognizing concurency.
+    // The sleep is necessary as the load balancer currently has a latency before recognizing concurrency.
     val sleep = 15.seconds
-    val slowInvokes = maximumConcurrentInvokes
-    val fastInvokes = 2
+    // Adding a bit of overcommit since some loadbalancers rely on some overcommit. This won't hurt those who don't
+    // since all activations are taken into account to check for throttled invokes below.
+    val slowInvokes = (maximumConcurrentInvokes * 1.2).toInt
+    val fastInvokes = 4
     val fastInvokeDuration = 4.seconds
     val slowInvokeDuration = sleep + fastInvokeDuration
 
diff --git a/tests/src/test/scala/whisk/common/ForcableSemaphoreTests.scala b/tests/src/test/scala/whisk/common/ForcableSemaphoreTests.scala
new file mode 100644
index 0000000000..bcd3871620
--- /dev/null
+++ b/tests/src/test/scala/whisk/common/ForcableSemaphoreTests.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class ForcableSemaphoreTests extends FlatSpec with Matchers {
+  behavior of "ForcableSemaphore"
+
+  it should "not allow to acquire, force or release negative amounts of permits" in {
+    val s = new ForcableSemaphore(2)
+    an[IllegalArgumentException] should be thrownBy s.tryAcquire(0)
+    an[IllegalArgumentException] should be thrownBy s.tryAcquire(-1)
+
+    an[IllegalArgumentException] should be thrownBy s.forceAcquire(0)
+    an[IllegalArgumentException] should be thrownBy s.forceAcquire(-1)
+
+    an[IllegalArgumentException] should be thrownBy s.release(0)
+    an[IllegalArgumentException] should be thrownBy s.release(-1)
+  }
+
+  it should "allow to acquire the defined amount of permits only" in {
+    val s = new ForcableSemaphore(2)
+    s.tryAcquire() shouldBe true // 1 permit left
+    s.tryAcquire() shouldBe true // 0 permits left
+    s.tryAcquire() shouldBe false
+
+    val s2 = new ForcableSemaphore(4)
+    s2.tryAcquire(5) shouldBe false // only 4 permits available
+    s2.tryAcquire(3) shouldBe true // 1 permit left
+    s2.tryAcquire(2) shouldBe false // only 1 permit available
+    s2.tryAcquire() shouldBe true
+  }
+
+  it should "allow to release permits again" in {
+    val s = new ForcableSemaphore(2)
+    s.tryAcquire() shouldBe true // 1 permit left
+    s.tryAcquire() shouldBe true // 0 permits left
+    s.tryAcquire() shouldBe false
+    s.release() // 1 permit left
+    s.tryAcquire() shouldBe true
+    s.release(2) // 1 permit left
+    s.tryAcquire(2) shouldBe true
+  }
+
+  it should "allow to force permits, delaying the acceptance of 'usual' permits until all of forced permits are released" in {
+    val s = new ForcableSemaphore(2)
+    s.tryAcquire(2) shouldBe true // 0 permits left
+    s.forceAcquire(5) // -5 permits left
+    s.tryAcquire() shouldBe false
+    s.release(4) // -1 permits left
+    s.tryAcquire() shouldBe false
+    s.release() // 0 permits left
+    s.tryAcquire() shouldBe false
+    s.release() // 1 permit left
+    s.tryAcquire() shouldBe true
+  }
+
+  it should "not give away more permits even under concurrent load" in {
+    // 100 iterations of this test
+    (0 until 100).foreach { _ =>
+      val s = new ForcableSemaphore(32)
+      // try to acquire more permits than allowed in parallel
+      val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+
+      val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
+      acquires should contain theSameElementsAs result
+    }
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
index 720e738dcc..3e2ea2af83 100644
--- a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
@@ -44,8 +44,8 @@ class RateThrottleTests extends FlatSpec with Matchers with StreamLogging {
   behavior of "Rate Throttle"
 
   it should "throttle when rate exceeds allowed threshold" in {
-    new RateThrottler("test", 0, _.limits.invocationsPerMinute).check(subject).ok shouldBe false
-    val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute)
+    new RateThrottler("test", _ => 0).check(subject).ok shouldBe false
+    val rt = new RateThrottler("test", _ => 1)
     rt.check(subject).ok shouldBe true
     rt.check(subject).ok shouldBe false
     rt.check(subject).ok shouldBe false
@@ -55,7 +55,7 @@ class RateThrottleTests extends FlatSpec with Matchers with StreamLogging {
 
   it should "check against an alternative limit if passed in" in {
     val withLimits = subject.copy(limits = UserLimits(invocationsPerMinute = Some(5)))
-    val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute)
+    val rt = new RateThrottler("test", u => u.limits.invocationsPerMinute.getOrElse(1))
     rt.check(withLimits).ok shouldBe true // 1
     rt.check(withLimits).ok shouldBe true // 2
     rt.check(withLimits).ok shouldBe true // 3
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
new file mode 100644
index 0000000000..ca8442f65c
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer.test
+
+import common.StreamLogging
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import whisk.common.ForcableSemaphore
+import whisk.core.entity.InstanceId
+import whisk.core.loadBalancer._
+
+/**
+ * Unit tests for the ContainerPool object.
+ *
+ * These tests test only the "static" methods "schedule" and "remove"
+ * of the ContainerPool object.
+ */
+@RunWith(classOf[JUnitRunner])
+class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with StreamLogging {
+  behavior of "ShardingContainerPoolBalancerState"
+
+  def healthy(i: Int) = new InvokerHealth(InstanceId(i), Healthy)
+  def unhealthy(i: Int) = new InvokerHealth(InstanceId(i), UnHealthy)
+  def offline(i: Int) = new InvokerHealth(InstanceId(i), Offline)
+
+  def semaphores(count: Int, max: Int): IndexedSeq[ForcableSemaphore] =
+    IndexedSeq.fill(count)(new ForcableSemaphore(max))
+
+  it should "update invoker's state, growing the slots data and keeping valid old data" in {
+    // start empty
+    val slots = 10
+    val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, slots))
+    state.invokers shouldBe 'empty
+    state.blackboxInvokers shouldBe 'empty
+    state.managedInvokers shouldBe 'empty
+    state.invokerSlots shouldBe 'empty
+    state.stepSizes shouldBe Seq()
+
+    // apply one update, verify everything is updated accordingly
+    val update1 = IndexedSeq(healthy(0))
+    state.updateInvokers(update1)
+
+    state.invokers shouldBe update1
+    state.blackboxInvokers shouldBe update1 // fallback to at least one
+    state.managedInvokers shouldBe update1 // fallback to at least one
+    state.invokerSlots.head.availablePermits shouldBe slots
+    state.stepSizes shouldBe Seq(1)
+
+    // aquire a slot to alter invoker state
+    state.invokerSlots.head.tryAcquire()
+    state.invokerSlots.head.availablePermits shouldBe slots - 1
+
+    // apply second update, growing the state
+    val update2 = IndexedSeq(healthy(0), healthy(1))
+    state.updateInvokers(update2)
+
+    state.invokers shouldBe update2
+    state.managedInvokers shouldBe IndexedSeq(update2.head)
+    state.blackboxInvokers shouldBe IndexedSeq(update2.last)
+    state.invokerSlots.head.availablePermits shouldBe slots - 1
+    state.invokerSlots(1).availablePermits shouldBe slots
+    state.stepSizes shouldBe Seq(1)
+  }
+
+  it should "update the cluster size, adjusting the invoker slots accordingly" in {
+    val slots = 10
+    val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, slots))
+    state.updateInvokers(IndexedSeq(healthy(0)))
+
+    state.invokerSlots.head.tryAcquire()
+    state.invokerSlots.head.availablePermits shouldBe slots - 1
+
+    state.updateCluster(2)
+    state.invokerSlots.head.availablePermits shouldBe slots / 2 // state reset + divided by 2
+  }
+
+  it should "fallback to a size of 1 (alone) if cluster size is < 1" in {
+    val slots = 10
+    val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, slots))
+    state.updateInvokers(IndexedSeq(healthy(0)))
+
+    state.invokerSlots.head.availablePermits shouldBe slots
+
+    state.updateCluster(2)
+    state.invokerSlots.head.availablePermits shouldBe slots / 2
+
+    state.updateCluster(0)
+    state.invokerSlots.head.availablePermits shouldBe slots
+
+    state.updateCluster(-1)
+    state.invokerSlots.head.availablePermits shouldBe slots
+  }
+
+  it should "set the threshold to 1 if the cluster is bigger than there are slots on 1 invoker" in {
+    val slots = 10
+    val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, slots))
+    state.updateInvokers(IndexedSeq(healthy(0)))
+
+    state.invokerSlots.head.availablePermits shouldBe slots
+
+    state.updateCluster(20)
+
+    state.invokerSlots.head.availablePermits shouldBe 1
+  }
+
+  behavior of "schedule"
+
+  it should "return None on an empty invoker list" in {
+    ShardingContainerPoolBalancer.schedule(IndexedSeq.empty, IndexedSeq.empty, index = 0, step = 2) shouldBe None
+  }
+
+  it should "return None if no invokers are healthy" in {
+    val invokerCount = 3
+    val invokerSlots = semaphores(invokerCount, 3)
+    val invokers = (0 until invokerCount).map(unhealthy)
+
+    ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2) shouldBe None
+  }
+
+  it should "choose the first available invoker, jumping in stepSize steps, falling back to randomized scheduling once all invokers are full" in {
+    val invokerCount = 3
+    val invokerSlots = semaphores(invokerCount + 3, 3) // needs to be offset by 3 as well
+    val invokers = (0 until invokerCount).map(i => healthy(i + 3)) // offset by 3 to asset InstanceId is returned
+
+    val expectedResult = Seq(3, 3, 3, 5, 5, 5, 4, 4, 4)
+    val result = expectedResult.map { _ =>
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2).get.toInt
+    }
+
+    result shouldBe expectedResult
+
+    val bruteResult = (0 to 100).map { _ =>
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2).get.toInt
+    }
+
+    bruteResult should contain allOf (3, 4, 5)
+  }
+
+  it should "ignore unhealthy or offline invokers" in {
+    val invokers = IndexedSeq(healthy(0), unhealthy(1), offline(2), healthy(3))
+    val invokerSlots = semaphores(invokers.size, 3)
+
+    val expectedResult = Seq(0, 0, 0, 3, 3, 3)
+    val result = expectedResult.map { _ =>
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 1).get.toInt
+    }
+
+    result shouldBe expectedResult
+
+    // more schedules will result in randomized invokers, but the unhealthy and offline invokers should not be part
+    val bruteResult = (0 to 100).map { _ =>
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 1).get.toInt
+    }
+
+    bruteResult should contain allOf (0, 3)
+    bruteResult should contain noneOf (1, 2)
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services