You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/04/03 14:57:27 UTC

[incubator-openwhisk] branch master updated: Remove deprecated loadbalancer. (#3413)

This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f647224  Remove deprecated loadbalancer. (#3413)
f647224 is described below

commit f64722498bc2f5eadc356b3ae26ebb85de1bcdbf
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Tue Apr 3 16:57:24 2018 +0200

    Remove deprecated loadbalancer. (#3413)
---
 .../core/loadBalancer/ContainerPoolBalancer.scala  | 356 ---------------------
 .../loadBalancer/DistributedLoadBalancerData.scala |  90 ------
 .../whisk/core/loadBalancer/LoadBalancer.scala     |   3 +
 .../whisk/core/loadBalancer/LoadBalancerData.scala |  86 -----
 .../core/loadBalancer/LocalLoadBalancerData.scala  |  76 -----
 .../ShardingContainerPoolBalancer.scala            |  38 ++-
 .../core/loadBalancer/SharedDataService.scala      |  98 ------
 .../test/ContainerPoolBalancerObjectTests.scala    | 164 ----------
 .../loadBalancer/test/LoadBalancerDataTests.scala  | 295 -----------------
 .../test/ShardingContainerPoolBalancerTests.scala  |  17 +
 .../loadBalancer/test/SharedDataServiceTests.scala |  81 -----
 11 files changed, 53 insertions(+), 1251 deletions(-)

diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
deleted file mode 100644
index 570281d..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.ThreadLocalRandom
-
-import akka.actor.{ActorSystem, Props}
-import akka.cluster.Cluster
-import akka.pattern.ask
-import akka.stream.ActorMaterializer
-import akka.util.Timeout
-import org.apache.kafka.clients.producer.RecordMetadata
-import pureconfig._
-import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
-import whisk.core.WhiskConfig._
-import whisk.core.connector._
-import whisk.core.entity._
-import whisk.core.{ConfigKeys, WhiskConfig}
-import whisk.spi.SpiLoader
-import akka.event.Logging.InfoLevel
-import pureconfig._
-
-import scala.annotation.tailrec
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
-
-case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
-
-class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)(implicit val actorSystem: ActorSystem,
-                                                                                 logging: Logging,
-                                                                                 materializer: ActorMaterializer)
-    extends LoadBalancer {
-
-  private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
-
-  /** The execution context for futures */
-  private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
-
-  private val activeAckTimeoutGrace = 1.minute
-
-  /** How many invokers are dedicated to blackbox images.  We range bound to something sensical regardless of configuration. */
-  private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
-  logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
-
-  /** Feature switch for shared load balancer data **/
-  private val loadBalancerData = {
-    if (config.controllerLocalBookkeeping) {
-      new LocalLoadBalancerData()
-    } else {
-
-      /** Specify how seed nodes are generated */
-      val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
-      Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes())
-      new DistributedLoadBalancerData()
-    }
-  }
-
-  override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
-    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
-    chooseInvoker(msg.user, action).flatMap { invokerName =>
-      val entry = setupActivation(action, msg.activationId, msg.user.uuid, invokerName, transid)
-      sendActivationToInvoker(messageProducer, msg, invokerName).map { _ =>
-        entry.promise.future
-      }
-    }
-  }
-
-  /** An indexed sequence of all invokers in the current system. */
-  override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = {
-    invokerPool
-      .ask(GetStatus)(Timeout(5.seconds))
-      .mapTo[IndexedSeq[InvokerHealth]]
-  }
-
-  override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
-
-  override def totalActiveActivations = loadBalancerData.totalActivationCount
-
-  override def clusterSize = config.controllerInstances.toInt
-
-  /**
-   * Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives.
-   * The promise is removed form the map when the result arrives or upon timeout.
-   *
-   * @param msg is the kafka message payload as Json
-   */
-  private def processCompletion(response: Either[ActivationId, WhiskActivation],
-                                tid: TransactionId,
-                                forced: Boolean,
-                                invoker: InstanceId): Unit = {
-    val aid = response.fold(l => l, r => r.activationId)
-
-    // treat left as success (as it is the result of a message exceeding the bus limit)
-    val isSuccess = response.fold(l => true, r => !r.response.isWhiskError)
-
-    loadBalancerData.removeActivation(aid) match {
-      case Some(entry) =>
-        logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
-        // Active acks that are received here are strictly from user actions - health actions are not part of
-        // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
-        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
-        if (!forced) {
-          entry.timeoutHandler.cancel()
-          entry.promise.trySuccess(response)
-        } else {
-          entry.promise.tryFailure(new Throwable("no active ack received"))
-        }
-      case None if !forced =>
-        // the entry has already been removed but we receive an active ack for this activation Id.
-        // This happens for health actions, because they don't have an entry in Loadbalancerdata or
-        // for activations that already timed out.
-        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
-        logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
-      case None =>
-        // the entry has already been removed by an active ack. This part of the code is reached by the timeout.
-        // As the active ack is already processed we don't have to do anything here.
-        logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
-    }
-  }
-
-  /**
-   * Creates an activation entry and insert into various maps.
-   */
-  private def setupActivation(action: ExecutableWhiskActionMetaData,
-                              activationId: ActivationId,
-                              namespaceId: UUID,
-                              invokerName: InstanceId,
-                              transid: TransactionId): ActivationEntry = {
-    val timeout = (action.limits.timeout.duration
-      .max(TimeLimit.STD_DURATION) * config.controllerInstances.toInt) + activeAckTimeoutGrace
-    // Install a timeout handler for the catastrophic case where an active ack is not received at all
-    // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
-    // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
-    // in this case, if the activation handler is still registered, remove it and update the books.
-    // in case of missing synchronization between n controllers in HA configuration the invoker queue can be overloaded
-    // n-1 times and the maximal time for answering with active ack can be n times the action time (plus some overhead)
-    loadBalancerData.putActivation(
-      activationId, {
-        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
-          processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
-        }
-
-        // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
-        ActivationEntry(
-          activationId,
-          namespaceId,
-          invokerName,
-          timeoutHandler,
-          Promise[Either[ActivationId, WhiskActivation]]())
-      })
-  }
-
-  /** Gets a producer which can publish messages to the kafka bus. */
-  private val messagingProvider = SpiLoader.get[MessagingProvider]
-  private val messageProducer = messagingProvider.getProducer(config)
-
-  private def sendActivationToInvoker(producer: MessageProducer,
-                                      msg: ActivationMessage,
-                                      invoker: InstanceId): Future[RecordMetadata] = {
-    implicit val transid = msg.transid
-
-    MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
-    val topic = s"invoker${invoker.toInt}"
-    val start = transid.started(
-      this,
-      LoggingMarkers.CONTROLLER_KAFKA,
-      s"posting topic '$topic' with activation id '${msg.activationId}'",
-      logLevel = InfoLevel)
-
-    producer.send(topic, msg).andThen {
-      case Success(status) =>
-        transid.finished(this, start, s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]")
-      case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic")
-    }
-  }
-
-  private val invokerPool = {
-    InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore())
-
-    actorSystem.actorOf(
-      InvokerPool.props(
-        (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
-        (m, i) => sendActivationToInvoker(messageProducer, m, i),
-        messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128)))
-  }
-
-  /**
-   * Subscribes to active acks (completion messages from the invokers), and
-   * registers a handler for received active acks from invokers.
-   */
-  val activeAckTopic = s"completed${controllerInstance.toInt}"
-  val maxActiveAcksPerPoll = 128
-  val activeAckPollDuration = 1.second
-  private val activeAckConsumer =
-    messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll)
-
-  val activationFeed = actorSystem.actorOf(Props {
-    new MessageFeed(
-      "activeack",
-      logging,
-      activeAckConsumer,
-      maxActiveAcksPerPoll,
-      activeAckPollDuration,
-      processActiveAck)
-  })
-
-  def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
-    val raw = new String(bytes, StandardCharsets.UTF_8)
-    CompletionMessage.parse(raw) match {
-      case Success(m: CompletionMessage) =>
-        processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
-        activationFeed ! MessageFeed.Processed
-
-      case Failure(t) =>
-        activationFeed ! MessageFeed.Processed
-        logging.error(this, s"failed processing message: $raw with $t")
-    }
-  }
-
-  /** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */
-  private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt)
-
-  /** Return invokers dedicated to running blackbox actions. */
-  private def blackboxInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
-    val blackboxes = numBlackbox(invokers.size)
-    invokers.takeRight(blackboxes)
-  }
-
-  /**
-   * Return (at least one) invokers for running non black-box actions.
-   * This set can overlap with the blackbox set if there is only one invoker.
-   */
-  private def managedInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
-    val managed = Math.max(1, invokers.length - numBlackbox(invokers.length))
-    invokers.take(managed)
-  }
-
-  /** Determine which invoker this activation should go to. Due to dynamic conditions, it may return no invoker. */
-  private def chooseInvoker(user: Identity, action: ExecutableWhiskActionMetaData): Future[InstanceId] = {
-    val hash = generateHash(user.namespace, action)
-
-    loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
-      invokerHealth().flatMap { invokers =>
-        val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
-        val invokersWithUsage = invokersToUse.view.map {
-          // Using a view defers the comparably expensive lookup to actual access of the element
-          case invoker => (invoker.id, invoker.status, currentActivations.getOrElse(invoker.id.toString, 0))
-        }
-
-        ContainerPoolBalancer.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
-          case Some(invoker) => Future.successful(invoker)
-          case None =>
-            logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
-            Future.failed(new LoadBalancerException("no invokers available"))
-        }
-      }
-    }
-  }
-
-  /** Generates a hash based on the string representation of namespace and action */
-  private def generateHash(namespace: EntityName, action: ExecutableWhiskActionMetaData): Int = {
-    (namespace.asString.hashCode() ^ action.fullyQualifiedName(false).asString.hashCode()).abs
-  }
-}
-
-object ContainerPoolBalancer extends LoadBalancerProvider {
-
-  override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
-    implicit actorSystem: ActorSystem,
-    logging: Logging,
-    materializer: ActorMaterializer): LoadBalancer = new ContainerPoolBalancer(whiskConfig, instance)
-
-  def requiredProperties =
-    kafkaHosts ++
-      Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
-
-  /** Memoizes the result of `f` for later use. */
-  def memoize[I, O](f: I => O): I => O = new scala.collection.mutable.HashMap[I, O]() {
-    override def apply(key: I) = getOrElseUpdate(key, f(key))
-  }
-
-  /** Euclidean algorithm to determine the greatest-common-divisor */
-  @tailrec
-  def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
-
-  /** Returns pairwise coprime numbers until x. Result is memoized. */
-  val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = ContainerPoolBalancer.memoize {
-    case x =>
-      (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
-        if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
-          primes :+ cur
-        } else primes
-      })
-  }
-
-  /**
-   * Scans through all invokers and searches for an invoker, that has a queue length
-   * below the defined threshold. The threshold is subject to a 3 times back off. Iff
-   * no "underloaded" invoker was found it will default to the first invoker in the
-   * step-defined progression that is healthy.
-   *
-   * @param invokers a list of available invokers to search in, including their state and usage
-   * @param invokerBusyThreshold defines when an invoker is considered overloaded
-   * @param hash stable identifier of the entity to be scheduled
-   * @return an invoker to schedule to or None of no invoker is available
-   */
-  def schedule(invokers: Seq[(InstanceId, InvokerState, Int)],
-               invokerBusyThreshold: Int,
-               hash: Int): Option[InstanceId] = {
-
-    val numInvokers = invokers.size
-    if (numInvokers > 0) {
-      val homeInvoker = hash % numInvokers
-      val stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(numInvokers)
-      val step = stepSizes(hash % stepSizes.size)
-
-      val invokerProgression = Stream
-        .from(0)
-        .take(numInvokers)
-        .map(i => (homeInvoker + i * step) % numInvokers)
-        .map(invokers)
-        .filter(_._2 == Healthy)
-
-      invokerProgression
-        .find(_._3 < invokerBusyThreshold)
-        .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 2))
-        .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 3))
-        .orElse(
-          if (invokerProgression.isEmpty)
-            None
-          else
-            Some(invokerProgression(ThreadLocalRandom.current().nextInt(invokerProgression.size))))
-        .map(_._1)
-    } else None
-  }
-
-}
-
-private case class LoadBalancerException(msg: String) extends Throwable(msg)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
deleted file mode 100644
index 34b5d67..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import akka.actor.ActorSystem
-import akka.util.Timeout
-import akka.pattern.ask
-import whisk.common.Logging
-import whisk.core.entity.{ActivationId, UUID}
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import scala.concurrent.duration._
-
-/**
- * Encapsulates data used for loadbalancer and active-ack bookkeeping.
- *
- * Note: The state keeping is backed by distributed akka actors. All CRUDs operations are done on local values, thus
- * a stale value might be read.
- */
-class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Logging) extends LoadBalancerData {
-
-  implicit val timeout = Timeout(5.seconds)
-  implicit val executionContext = actorSystem.dispatcher
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-
-  private val sharedStateInvokers = actorSystem.actorOf(
-    SharedDataService.props("Invokers"),
-    name =
-      "SharedDataServiceInvokers" + UUID())
-  private val sharedStateNamespaces = actorSystem.actorOf(
-    SharedDataService.props("Namespaces"),
-    name =
-      "SharedDataServiceNamespaces" + UUID())
-
-  def totalActivationCount =
-    (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.values.sum.toInt)
-
-  def activationCountOn(namespace: UUID): Future[Int] = {
-    (sharedStateNamespaces ? GetMap)
-      .mapTo[Map[String, BigInt]]
-      .map(_.mapValues(_.toInt).getOrElse(namespace.toString, 0))
-  }
-
-  def activationCountPerInvoker: Future[Map[String, Int]] = {
-    (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.mapValues(_.toInt))
-  }
-
-  def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(activationId)
-  }
-
-  def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
-    activationsById.getOrElseUpdate(id, {
-      val entry = update
-      sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 1)
-      sharedStateInvokers ! IncreaseCounter(entry.invokerName.toString, 1)
-      logging.debug(this, "increased shared counters")
-      entry
-    })
-  }
-
-  def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
-    activationsById.remove(entry.id).map { activationEntry =>
-      sharedStateInvokers ! DecreaseCounter(entry.invokerName.toString, 1)
-      sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, 1)
-      logging.debug(this, "decreased shared counters")
-      activationEntry
-    }
-  }
-
-  def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(aid).flatMap(removeActivation)
-  }
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
index ab8db1c..52ffd73 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -86,3 +86,6 @@ trait LoadBalancerProvider extends Spi {
                                                                    logging: Logging,
                                                                    materializer: ActorMaterializer): LoadBalancer
 }
+
+/** Exception thrown by the loadbalancer */
+case class LoadBalancerException(msg: String) extends Throwable(msg)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
deleted file mode 100644
index 0018cbb..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
-
-import akka.actor.Cancellable
-import scala.concurrent.{Future, Promise}
-
-// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
-case class ActivationEntry(id: ActivationId,
-                           namespaceId: UUID,
-                           invokerName: InstanceId,
-                           timeoutHandler: Cancellable,
-                           promise: Promise[Either[ActivationId, WhiskActivation]])
-trait LoadBalancerData {
-
-  /** Get the number of activations across all namespaces. */
-  def totalActivationCount: Future[Int]
-
-  /**
-   * Get the number of activations for a specific namespace.
-   *
-   * @param namespace The namespace to get the activation count for
-   * @return a map (namespace -> number of activations in the system)
-   */
-  def activationCountOn(namespace: UUID): Future[Int]
-
-  /**
-   * Get the number of activations for each invoker.
-   *
-   * @return a map (invoker -> number of activations queued for the invoker)
-   */
-  def activationCountPerInvoker: Future[Map[String, Int]]
-
-  /**
-   * Get an activation entry for a given activation id.
-   *
-   * @param activationId activation id to get data for
-   * @return the respective activation or None if it doesn't exist
-   */
-  def activationById(activationId: ActivationId): Option[ActivationEntry]
-
-  /**
-   * Adds an activation entry.
-   *
-   * @param id     identifier to deduplicate the entry
-   * @param update block calculating the entry to add.
-   *               Note: This is evaluated iff the entry
-   *               didn't exist before.
-   * @return the entry calculated by the block or iff it did
-   *         exist before the entry from the state
-   */
-  def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry
-
-  /**
-   * Removes the given entry.
-   *
-   * @param entry the entry to remove
-   * @return The deleted entry or None if nothing got deleted
-   */
-  def removeActivation(entry: ActivationEntry): Option[ActivationEntry]
-
-  /**
-   * Removes the activation identified by the given activation id.
-   *
-   * @param aid activation id to remove
-   * @return The deleted entry or None if nothing got deleted
-   */
-  def removeActivation(aid: ActivationId): Option[ActivationEntry]
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
deleted file mode 100644
index 92e3789..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import whisk.core.entity.{ActivationId, UUID}
-
-/**
- * Loadbalancer bookkeeping data which are stored locally,
- * e.g. not shared with other controller instances.
- *
- * Note: The state keeping is backed by concurrent data-structures. As such,
- * concurrent reads can return stale values (especially the counters returned).
- */
-class LocalLoadBalancerData() extends LoadBalancerData {
-
-  private val activationByInvoker = TrieMap[String, AtomicInteger]()
-  private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-  private val totalActivations = new AtomicInteger(0)
-
-  override def totalActivationCount: Future[Int] = Future.successful(totalActivations.get)
-
-  override def activationCountOn(namespace: UUID): Future[Int] = {
-    Future.successful(activationByNamespaceId.get(namespace).map(_.get).getOrElse(0))
-  }
-
-  override def activationCountPerInvoker: Future[Map[String, Int]] = {
-    Future.successful(activationByInvoker.toMap.mapValues(_.get))
-  }
-
-  override def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(activationId)
-  }
-
-  override def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
-    activationsById.getOrElseUpdate(id, {
-      val entry = update
-      totalActivations.incrementAndGet()
-      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).incrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).incrementAndGet()
-      entry
-    })
-  }
-
-  override def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
-    activationsById.remove(entry.id).map { x =>
-      totalActivations.decrementAndGet()
-      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).decrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).decrementAndGet()
-      x
-    }
-  }
-
-  override def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(aid).flatMap(removeActivation)
-  }
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index f6ce75d..718079e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.atomic.LongAdder
 
-import akka.actor.{Actor, ActorSystem, Props}
+import akka.actor.{Actor, ActorSystem, Cancellable, Props}
 import akka.cluster.ClusterEvent._
 import akka.cluster.{Cluster, Member, MemberStatus}
 import akka.event.Logging.InfoLevel
@@ -309,10 +309,23 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
     kafkaHosts ++
       Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
 
+  /** Generates a hash based on the string representation of namespace and action */
   def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): Int = {
     (namespace.asString.hashCode() ^ action.asString.hashCode()).abs
   }
 
+  /** Euclidean algorithm to determine the greatest-common-divisor */
+  @tailrec
+  def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
+
+  /** Returns pairwise coprime numbers until x. Result is memoized. */
+  def pairwiseCoprimeNumbersUntil(x: Int): IndexedSeq[Int] =
+    (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
+      if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
+        primes :+ cur
+      } else primes
+    })
+
   /**
    * Scans through all invokers and searches for an invoker tries to get a free slot on an invoker. If no slot can be
    * obtained, randomly picks a healthy invoker.
@@ -374,8 +387,8 @@ case class ShardingContainerPoolBalancerState(
   private var _invokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
   private var _managedInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
   private var _blackboxInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
-  private var _managedStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
-  private var _blackboxStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _managedStepSizes: Seq[Int] = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _blackboxStepSizes: Seq[Int] = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
   private var _invokerSlots: IndexedSeq[ForcableSemaphore] = IndexedSeq.empty[ForcableSemaphore],
   private var _clusterSize: Int = 1)(
   lbConfig: ShardingContainerPoolBalancerConfig =
@@ -419,8 +432,8 @@ case class ShardingContainerPoolBalancerState(
     _managedInvokers = _invokers.take(managed)
 
     if (oldSize != newSize) {
-      _managedStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
-      _blackboxStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
+      _managedStepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
+      _blackboxStepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
 
       if (oldSize < newSize) {
         // Keeps the existing state..
@@ -467,3 +480,18 @@ case class ShardingContainerPoolBalancerState(
  * @param invokerBusyThreshold how many slots an invoker has available in total
  */
 case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
+
+/**
+ * State kept for each activation until completion.
+ *
+ * @param id id of the activation
+ * @param namespaceId namespace that invoked the action
+ * @param invokerName invoker the action is scheduled to
+ * @param timeoutHandler times out completion of this activation, should be canceled on good paths
+ * @param promise the promise to be completed by the activation
+ */
+case class ActivationEntry(id: ActivationId,
+                           namespaceId: UUID,
+                           invokerName: InstanceId,
+                           timeoutHandler: Cancellable,
+                           promise: Promise[Either[ActivationId, WhiskActivation]])
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
deleted file mode 100644
index d0595d3..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import akka.actor.{Actor, ActorLogging, ActorRef, Props}
-import akka.cluster.Cluster
-import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.{DistributedData, PNCounterMap, PNCounterMapKey}
-import akka.cluster.ddata.Replicator._
-import whisk.common.AkkaLogging
-
-case class IncreaseCounter(key: String, value: Long)
-case class DecreaseCounter(key: String, value: Long)
-case class ReadCounter(key: String)
-case class RemoveCounter(key: String)
-case object GetMap
-
-/**
- * Companion object to specify actor properties from the outside, e.g. name of the shared map and cluster seed nodes
- */
-object SharedDataService {
-  def props(storageName: String): Props =
-    Props(new SharedDataService(storageName))
-}
-
-class SharedDataService(storageName: String) extends Actor with ActorLogging {
-
-  val replicator = DistributedData(context.system).replicator
-
-  val logging = new AkkaLogging(context.system.log)
-
-  val storage = PNCounterMapKey[String](storageName)
-
-  implicit val node = Cluster(context.system)
-
-  /**
-   * Subscribe this node for the changes in the Map, initialize the Map
-   */
-  override def preStart(): Unit = {
-    replicator ! Subscribe(storage, self)
-    node.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
-    replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.remove(node, "0"))
-  }
-  override def postStop(): Unit = node.unsubscribe(self)
-
-  /**
-   * CRUD operations on the counter, process cluster member events for logging
-   * @return
-   */
-  def receive = {
-
-    case (IncreaseCounter(key, increment)) =>
-      replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.increment(key, increment))
-
-    case (DecreaseCounter(key, decrement)) =>
-      replicator ! Update(storage, PNCounterMap[String], writeLocal)(_.decrement(key, decrement))
-
-    case GetMap =>
-      replicator ! Get(storage, readLocal, request = Some((sender())))
-
-    case MemberUp(member) =>
-      logging.info(this, "Member is Up: " + member.address)
-
-    case MemberRemoved(member, previousStatus) =>
-      logging.warn(this, s"Member is Removed: ${member.address} after $previousStatus")
-
-    case c @ Changed(_) =>
-      logging.debug(this, "Current elements: " + c.get(storage))
-
-    case g @ GetSuccess(_, Some((replyTo: ActorRef))) =>
-      val map = g.get(storage).entries
-      replyTo ! map
-
-    case g @ GetSuccess(_, Some((replyTo: ActorRef, key: String))) =>
-      if (g.get(storage).contains(key)) {
-        val response = g.get(storage).getValue(key).intValue()
-        replyTo ! response
-      } else
-        replyTo ! None
-
-    case _ => // ignore
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
deleted file mode 100644
index fd3252d..0000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import whisk.core.loadBalancer.ContainerPoolBalancer
-import whisk.core.loadBalancer.Healthy
-import whisk.core.loadBalancer.Offline
-import whisk.core.loadBalancer.UnHealthy
-import whisk.core.entity.InstanceId
-
-/**
- * Unit tests for the ContainerPool object.
- *
- * These tests test only the "static" methods "schedule" and "remove"
- * of the ContainerPool object.
- */
-@RunWith(classOf[JUnitRunner])
-class ContainerPoolBalancerObjectTests extends FlatSpec with Matchers {
-  behavior of "memoize"
-
-  it should "not recompute a value which was already given" in {
-    var calls = 0
-    val add1: Int => Int = ContainerPoolBalancer.memoize {
-      case second =>
-        calls += 1
-        1 + second
-    }
-
-    add1(1) shouldBe 2
-    calls shouldBe 1
-    add1(1) shouldBe 2
-    calls shouldBe 1
-    add1(2) shouldBe 3
-    calls shouldBe 2
-    add1(1) shouldBe 2
-    calls shouldBe 2
-  }
-
-  behavior of "pairwiseCoprimeNumbersUntil"
-
-  it should "return an empty set for malformed inputs" in {
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
-  }
-
-  it should "return all coprime numbers until the number given" in {
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
-  }
-
-  behavior of "chooseInvoker"
-
-  def invokers(n: Int) = (0 until n).map(i => (InstanceId(i), Healthy, 0))
-  def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
-
-  it should "return None on an empty invokers list" in {
-    ContainerPoolBalancer.schedule(IndexedSeq(), 0, 1) shouldBe None
-  }
-
-  it should "return None on a list of offline/unhealthy invokers" in {
-    val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0))
-
-    ContainerPoolBalancer.schedule(invs, 0, 1) shouldBe None
-  }
-
-  it should "schedule to the home invoker" in {
-    val invs = invokers(10)
-    val hash = 2
-
-    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size))
-  }
-
-  it should "take the only online invoker" in {
-    ContainerPoolBalancer.schedule(
-      IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)),
-      0,
-      1) shouldBe Some(InstanceId(2))
-  }
-
-  it should "skip an offline/unhealthy invoker, even if its underloaded" in {
-    val hash = 0
-    val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0))
-
-    ContainerPoolBalancer.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
-  }
-
-  it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in {
-    val invokerCount = 10
-    val hash = 2
-    val targetInvoker = hash % invokerCount
-
-    val invs = invokers(invokerCount).updated(targetInvoker, (InstanceId(targetInvoker), Healthy, 1))
-    val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash)
-
-    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size))
-  }
-
-  it should "wrap the search at the end of the invoker list" in {
-    val invokerCount = 3
-    val invs = IndexedSeq((InstanceId(0), Healthy, 1), (InstanceId(1), Healthy, 1), (InstanceId(2), Healthy, 0))
-    val hash = 1
-
-    val targetInvoker = hashInto(invs, hash) // will be invoker1
-    val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2
-    step shouldBe 2
-
-    // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target
-    // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded
-    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size))
-  }
-
-  it should "multiply its threshold in 3 iterations to find an invoker with a good warm-chance" in {
-    val invs = IndexedSeq((InstanceId(0), Healthy, 33), (InstanceId(1), Healthy, 36), (InstanceId(2), Healthy, 33))
-    val hash = 0 // home is 0, stepsize is 1
-
-    // even though invoker1 is not the home invoker in this case, it gets chosen over
-    // the others because it's the first one encountered by the iteration mechanism to be below
-    // the threshold of 3 * 16 invocations
-    ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
-  }
-
-  it should "choose the random invoker if all invokers are overloaded even above the muliplied threshold" in {
-    val invs = IndexedSeq((InstanceId(0), Healthy, 33), (InstanceId(1), Healthy, 33), (InstanceId(2), Healthy, 33))
-    val invokerBusyThreshold = 11
-    val hash = 0
-    val bruteResult = (0 to 100) map { _ =>
-      ContainerPoolBalancer.schedule(invs, invokerBusyThreshold, hash).get.toInt
-    }
-    bruteResult should contain allOf (0, 1, 2)
-  }
-
-  it should "transparently work with partitioned sets of invokers" in {
-    val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4), Healthy, 0), (InstanceId(5), Healthy, 0))
-
-    ContainerPoolBalancer.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
-    ContainerPoolBalancer.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
-    ContainerPoolBalancer.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
-    ContainerPoolBalancer.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
deleted file mode 100644
index 5a4edb6..0000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import akka.actor.ActorSystem
-import akka.actor.Cancellable
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import common.StreamLogging
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
-import whisk.core.loadBalancer.{ActivationEntry, DistributedLoadBalancerData, LocalLoadBalancerData}
-
-import scala.concurrent.{Await, Future, Promise}
-import whisk.core.entity.InstanceId
-
-import scala.concurrent.duration._
-
-@RunWith(classOf[JUnitRunner])
-class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
-  final val emptyCancellable: Cancellable = new Cancellable {
-    def isCancelled = false
-    def cancel() = true
-  }
-
-  val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
-  val firstEntry: ActivationEntry =
-    ActivationEntry(ActivationId.generate(), UUID(), InstanceId(0), emptyCancellable, activationIdPromise)
-  val secondEntry: ActivationEntry =
-    ActivationEntry(ActivationId.generate(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
-
-  val port = 2552
-  val host = "127.0.0.1"
-  val config = ConfigFactory
-    .parseString(s"akka.remote.netty.tcp.hostname = $host")
-    .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port))
-    .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("cluster"))
-    .withFallback(ConfigFactory.load())
-
-  val actorSystemName = "controller-actor-system"
-
-  implicit val actorSystem = ActorSystem(actorSystemName, config)
-
-  def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = Await.result(f, timeout)
-
-  behavior of "LoadBalancerData"
-
-  it should "return the number of activations for a namespace" in {
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-//    test all implementations
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-      await(lbd.activationCountPerInvoker) shouldBe Map(firstEntry.invokerName.toString -> 1)
-      lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
-
-      // clean up after yourself
-      lbd.removeActivation(firstEntry.id)
-    }
-  }
-
-  it should "return the number of activations for each invoker" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-      lbd.putActivation(secondEntry.id, secondEntry)
-
-      val res = await(lbd.activationCountPerInvoker)
-
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      res.get(secondEntry.invokerName.toString()) shouldBe Some(1)
-
-      lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
-      lbd.activationById(secondEntry.id) shouldBe Some(secondEntry)
-
-      // clean up after yourself
-      lbd.removeActivation(firstEntry.id)
-      lbd.removeActivation(secondEntry.id)
-    }
-
-  }
-
-  it should "remove activations and reflect that accordingly" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
-      lbd.removeActivation(firstEntry)
-
-      val resAfterRemoval = await(lbd.activationCountPerInvoker)
-      resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
-
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 0
-      lbd.activationById(firstEntry.id) shouldBe None
-    }
-
-  }
-
-  it should "remove activations from all 3 maps by activation id" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-
-      lbd.removeActivation(firstEntry.id)
-
-      val resAfterRemoval = await(lbd.activationCountPerInvoker)
-      resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
-    }
-
-  }
-
-  it should "return None if the entry doesn't exist when we remove it" in {
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.removeActivation(firstEntry) shouldBe None
-    }
-
-  }
-
-  it should "respond with different values accordingly" in {
-
-    val entry = ActivationEntry(ActivationId.generate(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
-    val entrySameInvokerAndNamespace = entry.copy(id = ActivationId.generate())
-    val entrySameInvoker = entry.copy(id = ActivationId.generate(), namespaceId = UUID())
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(entry.id, entry)
-
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      var res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(1)
-
-      lbd.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
-
-      lbd.putActivation(entrySameInvoker.id, entrySameInvoker)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(3)
-
-      lbd.removeActivation(entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
-
-      // removing non existing entry doesn't mess up
-      lbd.removeActivation(entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
-
-      // clean up
-      lbd.removeActivation(entry)
-      lbd.removeActivation(entrySameInvoker.id)
-    }
-
-  }
-
-  it should "not add the same entry more then once" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
-      lbd.putActivation(firstEntry.id, firstEntry)
-      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
-      resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
-      lbd.removeActivation(firstEntry)
-      lbd.removeActivation(firstEntry)
-    }
-
-  }
-
-  it should "not evaluate the given block if an entry already exists" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      var called = 0
-
-      val entry = lbd.putActivation(firstEntry.id, {
-        called += 1
-        firstEntry
-      })
-
-      called shouldBe 1
-
-      // entry already exists, should not evaluate the block
-      val entryAfterSecond = lbd.putActivation(firstEntry.id, {
-        called += 1
-        firstEntry
-      })
-
-      called shouldBe 1
-      entry shouldBe entryAfterSecond
-
-      // clean up after yourself
-      lbd.removeActivation(firstEntry)
-    }
-
-  }
-
-  it should "not evaluate the given block even if an entry is different (but has the same id)" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      var called = 0
-      val entrySameId = secondEntry.copy(id = firstEntry.id)
-
-      val entry = lbd.putActivation(firstEntry.id, {
-        called += 1
-        firstEntry
-      })
-
-      called shouldBe 1
-
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
-      // entry already exists, should not evaluate the block and change the state
-      val entryAfterSecond = lbd.putActivation(entrySameId.id, {
-        called += 1
-        entrySameId
-      })
-
-      called shouldBe 1
-      entry shouldBe entryAfterSecond
-      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
-      resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-    }
-
-  }
-
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 50201a6..49712c2 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -176,4 +176,21 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     bruteResult should contain allOf (0, 3)
     bruteResult should contain noneOf (1, 2)
   }
+
+  behavior of "pairwiseCoprimeNumbersUntil"
+
+  it should "return an empty set for malformed inputs" in {
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
+  }
+
+  it should "return all coprime numbers until the number given" in {
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
deleted file mode 100644
index b3b37a7..0000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import akka.actor.ActorSystem
-import akka.testkit.{ImplicitSender, TestKit}
-import akka.util.Timeout
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import org.junit.runner.RunWith
-import org.scalatest.{FlatSpecLike, _}
-import org.scalatest.junit.JUnitRunner
-import whisk.core.loadBalancer._
-
-import scala.concurrent.duration._
-
-// Define your test specific configuration here
-
-object TestKitConfig {
-  val config = ConfigFactory.empty
-    .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef("127.0.0.1"))
-    .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef("2555"))
-    .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("cluster"))
-}
-
-@RunWith(classOf[JUnitRunner])
-class SharedDataServiceTests()
-    extends TestKit(ActorSystem("ControllerCluster", TestKitConfig.config))
-    with ImplicitSender
-    with FlatSpecLike
-    with Matchers
-    with BeforeAndAfterAll {
-
-  override def afterAll {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  behavior of "SharedDataService"
-
-  val sharedDataService = system.actorOf(SharedDataService.props("Candidates"), name = "busyMan")
-  implicit val timeout = Timeout(5.seconds)
-
-  it should "retrieve an empty map after initialization" in {
-    sharedDataService ! GetMap
-    val msg = Map()
-    expectMsg(msg)
-  }
-  it should "increase the counter" in {
-    sharedDataService ! IncreaseCounter("Donald", 1)
-    sharedDataService ! GetMap
-    val msg = Map("Donald" -> 1)
-    expectMsg(msg)
-  }
-  it should "decrease the counter" in {
-    sharedDataService ! IncreaseCounter("Donald", 2)
-    sharedDataService ! DecreaseCounter("Donald", 2)
-    sharedDataService ! GetMap
-    val msg = Map("Donald" -> 1)
-    expectMsg(msg)
-  }
-  it should "receive the map with all counters" in {
-    sharedDataService ! IncreaseCounter("Hilary", 1)
-    sharedDataService ! GetMap
-    val msg = Map("Hilary" -> 1, "Donald" -> 1)
-    expectMsg(msg)
-  }
-}

-- 
To stop receiving notification emails like this one, please contact
cbickel@apache.org.