You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/03 14:57:26 UTC

[GitHub] cbickel closed pull request #3413: Remove deprecated loadbalancer.

cbickel closed pull request #3413: Remove deprecated loadbalancer.
URL: https://github.com/apache/incubator-openwhisk/pull/3413
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
deleted file mode 100644
index 570281d658..0000000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.ThreadLocalRandom
-
-import akka.actor.{ActorSystem, Props}
-import akka.cluster.Cluster
-import akka.pattern.ask
-import akka.stream.ActorMaterializer
-import akka.util.Timeout
-import org.apache.kafka.clients.producer.RecordMetadata
-import pureconfig._
-import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
-import whisk.core.WhiskConfig._
-import whisk.core.connector._
-import whisk.core.entity._
-import whisk.core.{ConfigKeys, WhiskConfig}
-import whisk.spi.SpiLoader
-import akka.event.Logging.InfoLevel
-import pureconfig._
-
-import scala.annotation.tailrec
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
-
-case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
-
-class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)(implicit val actorSystem: ActorSystem,
-                                                                                 logging: Logging,
-                                                                                 materializer: ActorMaterializer)
-    extends LoadBalancer {
-
-  private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
-
-  /** The execution context for futures */
-  private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
-
-  private val activeAckTimeoutGrace = 1.minute
-
-  /** How many invokers are dedicated to blackbox images.  We range bound to something sensical regardless of configuration. */
-  private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
-  logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
-
-  /** Feature switch for shared load balancer data **/
-  private val loadBalancerData = {
-    if (config.controllerLocalBookkeeping) {
-      new LocalLoadBalancerData()
-    } else {
-
-      /** Specify how seed nodes are generated */
-      val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
-      Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes())
-      new DistributedLoadBalancerData()
-    }
-  }
-
-  override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
-    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
-    chooseInvoker(msg.user, action).flatMap { invokerName =>
-      val entry = setupActivation(action, msg.activationId, msg.user.uuid, invokerName, transid)
-      sendActivationToInvoker(messageProducer, msg, invokerName).map { _ =>
-        entry.promise.future
-      }
-    }
-  }
-
-  /** An indexed sequence of all invokers in the current system. */
-  override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = {
-    invokerPool
-      .ask(GetStatus)(Timeout(5.seconds))
-      .mapTo[IndexedSeq[InvokerHealth]]
-  }
-
-  override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
-
-  override def totalActiveActivations = loadBalancerData.totalActivationCount
-
-  override def clusterSize = config.controllerInstances.toInt
-
-  /**
-   * Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives.
-   * The promise is removed form the map when the result arrives or upon timeout.
-   *
-   * @param msg is the kafka message payload as Json
-   */
-  private def processCompletion(response: Either[ActivationId, WhiskActivation],
-                                tid: TransactionId,
-                                forced: Boolean,
-                                invoker: InstanceId): Unit = {
-    val aid = response.fold(l => l, r => r.activationId)
-
-    // treat left as success (as it is the result of a message exceeding the bus limit)
-    val isSuccess = response.fold(l => true, r => !r.response.isWhiskError)
-
-    loadBalancerData.removeActivation(aid) match {
-      case Some(entry) =>
-        logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
-        // Active acks that are received here are strictly from user actions - health actions are not part of
-        // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
-        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
-        if (!forced) {
-          entry.timeoutHandler.cancel()
-          entry.promise.trySuccess(response)
-        } else {
-          entry.promise.tryFailure(new Throwable("no active ack received"))
-        }
-      case None if !forced =>
-        // the entry has already been removed but we receive an active ack for this activation Id.
-        // This happens for health actions, because they don't have an entry in Loadbalancerdata or
-        // for activations that already timed out.
-        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
-        logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
-      case None =>
-        // the entry has already been removed by an active ack. This part of the code is reached by the timeout.
-        // As the active ack is already processed we don't have to do anything here.
-        logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
-    }
-  }
-
-  /**
-   * Creates an activation entry and insert into various maps.
-   */
-  private def setupActivation(action: ExecutableWhiskActionMetaData,
-                              activationId: ActivationId,
-                              namespaceId: UUID,
-                              invokerName: InstanceId,
-                              transid: TransactionId): ActivationEntry = {
-    val timeout = (action.limits.timeout.duration
-      .max(TimeLimit.STD_DURATION) * config.controllerInstances.toInt) + activeAckTimeoutGrace
-    // Install a timeout handler for the catastrophic case where an active ack is not received at all
-    // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
-    // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
-    // in this case, if the activation handler is still registered, remove it and update the books.
-    // in case of missing synchronization between n controllers in HA configuration the invoker queue can be overloaded
-    // n-1 times and the maximal time for answering with active ack can be n times the action time (plus some overhead)
-    loadBalancerData.putActivation(
-      activationId, {
-        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
-          processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
-        }
-
-        // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
-        ActivationEntry(
-          activationId,
-          namespaceId,
-          invokerName,
-          timeoutHandler,
-          Promise[Either[ActivationId, WhiskActivation]]())
-      })
-  }
-
-  /** Gets a producer which can publish messages to the kafka bus. */
-  private val messagingProvider = SpiLoader.get[MessagingProvider]
-  private val messageProducer = messagingProvider.getProducer(config)
-
-  private def sendActivationToInvoker(producer: MessageProducer,
-                                      msg: ActivationMessage,
-                                      invoker: InstanceId): Future[RecordMetadata] = {
-    implicit val transid = msg.transid
-
-    MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
-    val topic = s"invoker${invoker.toInt}"
-    val start = transid.started(
-      this,
-      LoggingMarkers.CONTROLLER_KAFKA,
-      s"posting topic '$topic' with activation id '${msg.activationId}'",
-      logLevel = InfoLevel)
-
-    producer.send(topic, msg).andThen {
-      case Success(status) =>
-        transid.finished(this, start, s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]")
-      case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic")
-    }
-  }
-
-  private val invokerPool = {
-    InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore())
-
-    actorSystem.actorOf(
-      InvokerPool.props(
-        (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
-        (m, i) => sendActivationToInvoker(messageProducer, m, i),
-        messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128)))
-  }
-
-  /**
-   * Subscribes to active acks (completion messages from the invokers), and
-   * registers a handler for received active acks from invokers.
-   */
-  val activeAckTopic = s"completed${controllerInstance.toInt}"
-  val maxActiveAcksPerPoll = 128
-  val activeAckPollDuration = 1.second
-  private val activeAckConsumer =
-    messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll)
-
-  val activationFeed = actorSystem.actorOf(Props {
-    new MessageFeed(
-      "activeack",
-      logging,
-      activeAckConsumer,
-      maxActiveAcksPerPoll,
-      activeAckPollDuration,
-      processActiveAck)
-  })
-
-  def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
-    val raw = new String(bytes, StandardCharsets.UTF_8)
-    CompletionMessage.parse(raw) match {
-      case Success(m: CompletionMessage) =>
-        processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
-        activationFeed ! MessageFeed.Processed
-
-      case Failure(t) =>
-        activationFeed ! MessageFeed.Processed
-        logging.error(this, s"failed processing message: $raw with $t")
-    }
-  }
-
-  /** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */
-  private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt)
-
-  /** Return invokers dedicated to running blackbox actions. */
-  private def blackboxInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
-    val blackboxes = numBlackbox(invokers.size)
-    invokers.takeRight(blackboxes)
-  }
-
-  /**
-   * Return (at least one) invokers for running non black-box actions.
-   * This set can overlap with the blackbox set if there is only one invoker.
-   */
-  private def managedInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
-    val managed = Math.max(1, invokers.length - numBlackbox(invokers.length))
-    invokers.take(managed)
-  }
-
-  /** Determine which invoker this activation should go to. Due to dynamic conditions, it may return no invoker. */
-  private def chooseInvoker(user: Identity, action: ExecutableWhiskActionMetaData): Future[InstanceId] = {
-    val hash = generateHash(user.namespace, action)
-
-    loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
-      invokerHealth().flatMap { invokers =>
-        val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
-        val invokersWithUsage = invokersToUse.view.map {
-          // Using a view defers the comparably expensive lookup to actual access of the element
-          case invoker => (invoker.id, invoker.status, currentActivations.getOrElse(invoker.id.toString, 0))
-        }
-
-        ContainerPoolBalancer.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
-          case Some(invoker) => Future.successful(invoker)
-          case None =>
-            logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
-            Future.failed(new LoadBalancerException("no invokers available"))
-        }
-      }
-    }
-  }
-
-  /** Generates a hash based on the string representation of namespace and action */
-  private def generateHash(namespace: EntityName, action: ExecutableWhiskActionMetaData): Int = {
-    (namespace.asString.hashCode() ^ action.fullyQualifiedName(false).asString.hashCode()).abs
-  }
-}
-
-object ContainerPoolBalancer extends LoadBalancerProvider {
-
-  override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
-    implicit actorSystem: ActorSystem,
-    logging: Logging,
-    materializer: ActorMaterializer): LoadBalancer = new ContainerPoolBalancer(whiskConfig, instance)
-
-  def requiredProperties =
-    kafkaHosts ++
-      Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
-
-  /** Memoizes the result of `f` for later use. */
-  def memoize[I, O](f: I => O): I => O = new scala.collection.mutable.HashMap[I, O]() {
-    override def apply(key: I) = getOrElseUpdate(key, f(key))
-  }
-
-  /** Euclidean algorithm to determine the greatest-common-divisor */
-  @tailrec
-  def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
-
-  /** Returns pairwise coprime numbers until x. Result is memoized. */
-  val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = ContainerPoolBalancer.memoize {
-    case x =>
-      (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
-        if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
-          primes :+ cur
-        } else primes
-      })
-  }
-
-  /**
-   * Scans through all invokers and searches for an invoker, that has a queue length
-   * below the defined threshold. The threshold is subject to a 3 times back off. Iff
-   * no "underloaded" invoker was found it will default to the first invoker in the
-   * step-defined progression that is healthy.
-   *
-   * @param invokers a list of available invokers to search in, including their state and usage
-   * @param invokerBusyThreshold defines when an invoker is considered overloaded
-   * @param hash stable identifier of the entity to be scheduled
-   * @return an invoker to schedule to or None of no invoker is available
-   */
-  def schedule(invokers: Seq[(InstanceId, InvokerState, Int)],
-               invokerBusyThreshold: Int,
-               hash: Int): Option[InstanceId] = {
-
-    val numInvokers = invokers.size
-    if (numInvokers > 0) {
-      val homeInvoker = hash % numInvokers
-      val stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(numInvokers)
-      val step = stepSizes(hash % stepSizes.size)
-
-      val invokerProgression = Stream
-        .from(0)
-        .take(numInvokers)
-        .map(i => (homeInvoker + i * step) % numInvokers)
-        .map(invokers)
-        .filter(_._2 == Healthy)
-
-      invokerProgression
-        .find(_._3 < invokerBusyThreshold)
-        .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 2))
-        .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 3))
-        .orElse(
-          if (invokerProgression.isEmpty)
-            None
-          else
-            Some(invokerProgression(ThreadLocalRandom.current().nextInt(invokerProgression.size))))
-        .map(_._1)
-    } else None
-  }
-
-}
-
-private case class LoadBalancerException(msg: String) extends Throwable(msg)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
deleted file mode 100644
index 34b5d6708f..0000000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import akka.actor.ActorSystem
-import akka.util.Timeout
-import akka.pattern.ask
-import whisk.common.Logging
-import whisk.core.entity.{ActivationId, UUID}
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import scala.concurrent.duration._
-
-/**
- * Encapsulates data used for loadbalancer and active-ack bookkeeping.
- *
- * Note: The state keeping is backed by distributed akka actors. All CRUDs operations are done on local values, thus
- * a stale value might be read.
- */
-class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Logging) extends LoadBalancerData {
-
-  implicit val timeout = Timeout(5.seconds)
-  implicit val executionContext = actorSystem.dispatcher
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-
-  private val sharedStateInvokers = actorSystem.actorOf(
-    SharedDataService.props("Invokers"),
-    name =
-      "SharedDataServiceInvokers" + UUID())
-  private val sharedStateNamespaces = actorSystem.actorOf(
-    SharedDataService.props("Namespaces"),
-    name =
-      "SharedDataServiceNamespaces" + UUID())
-
-  def totalActivationCount =
-    (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.values.sum.toInt)
-
-  def activationCountOn(namespace: UUID): Future[Int] = {
-    (sharedStateNamespaces ? GetMap)
-      .mapTo[Map[String, BigInt]]
-      .map(_.mapValues(_.toInt).getOrElse(namespace.toString, 0))
-  }
-
-  def activationCountPerInvoker: Future[Map[String, Int]] = {
-    (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.mapValues(_.toInt))
-  }
-
-  def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(activationId)
-  }
-
-  def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
-    activationsById.getOrElseUpdate(id, {
-      val entry = update
-      sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 1)
-      sharedStateInvokers ! IncreaseCounter(entry.invokerName.toString, 1)
-      logging.debug(this, "increased shared counters")
-      entry
-    })
-  }
-
-  def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
-    activationsById.remove(entry.id).map { activationEntry =>
-      sharedStateInvokers ! DecreaseCounter(entry.invokerName.toString, 1)
-      sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, 1)
-      logging.debug(this, "decreased shared counters")
-      activationEntry
-    }
-  }
-
-  def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(aid).flatMap(removeActivation)
-  }
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
index ab8db1c589..52ffd73d4e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -86,3 +86,6 @@ trait LoadBalancerProvider extends Spi {
                                                                    logging: Logging,
                                                                    materializer: ActorMaterializer): LoadBalancer
 }
+
+/** Exception thrown by the loadbalancer */
+case class LoadBalancerException(msg: String) extends Throwable(msg)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
deleted file mode 100644
index 0018cbbd8b..0000000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
-
-import akka.actor.Cancellable
-import scala.concurrent.{Future, Promise}
-
-// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
-case class ActivationEntry(id: ActivationId,
-                           namespaceId: UUID,
-                           invokerName: InstanceId,
-                           timeoutHandler: Cancellable,
-                           promise: Promise[Either[ActivationId, WhiskActivation]])
-trait LoadBalancerData {
-
-  /** Get the number of activations across all namespaces. */
-  def totalActivationCount: Future[Int]
-
-  /**
-   * Get the number of activations for a specific namespace.
-   *
-   * @param namespace The namespace to get the activation count for
-   * @return a map (namespace -> number of activations in the system)
-   */
-  def activationCountOn(namespace: UUID): Future[Int]
-
-  /**
-   * Get the number of activations for each invoker.
-   *
-   * @return a map (invoker -> number of activations queued for the invoker)
-   */
-  def activationCountPerInvoker: Future[Map[String, Int]]
-
-  /**
-   * Get an activation entry for a given activation id.
-   *
-   * @param activationId activation id to get data for
-   * @return the respective activation or None if it doesn't exist
-   */
-  def activationById(activationId: ActivationId): Option[ActivationEntry]
-
-  /**
-   * Adds an activation entry.
-   *
-   * @param id     identifier to deduplicate the entry
-   * @param update block calculating the entry to add.
-   *               Note: This is evaluated iff the entry
-   *               didn't exist before.
-   * @return the entry calculated by the block or iff it did
-   *         exist before the entry from the state
-   */
-  def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry
-
-  /**
-   * Removes the given entry.
-   *
-   * @param entry the entry to remove
-   * @return The deleted entry or None if nothing got deleted
-   */
-  def removeActivation(entry: ActivationEntry): Option[ActivationEntry]
-
-  /**
-   * Removes the activation identified by the given activation id.
-   *
-   * @param aid activation id to remove
-   * @return The deleted entry or None if nothing got deleted
-   */
-  def removeActivation(aid: ActivationId): Option[ActivationEntry]
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
deleted file mode 100644
index 92e3789e76..0000000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import whisk.core.entity.{ActivationId, UUID}
-
-/**
- * Loadbalancer bookkeeping data which are stored locally,
- * e.g. not shared with other controller instances.
- *
- * Note: The state keeping is backed by concurrent data-structures. As such,
- * concurrent reads can return stale values (especially the counters returned).
- */
-class LocalLoadBalancerData() extends LoadBalancerData {
-
-  private val activationByInvoker = TrieMap[String, AtomicInteger]()
-  private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-  private val totalActivations = new AtomicInteger(0)
-
-  override def totalActivationCount: Future[Int] = Future.successful(totalActivations.get)
-
-  override def activationCountOn(namespace: UUID): Future[Int] = {
-    Future.successful(activationByNamespaceId.get(namespace).map(_.get).getOrElse(0))
-  }
-
-  override def activationCountPerInvoker: Future[Map[String, Int]] = {
-    Future.successful(activationByInvoker.toMap.mapValues(_.get))
-  }
-
-  override def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(activationId)
-  }
-
-  override def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
-    activationsById.getOrElseUpdate(id, {
-      val entry = update
-      totalActivations.incrementAndGet()
-      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).incrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).incrementAndGet()
-      entry
-    })
-  }
-
-  override def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
-    activationsById.remove(entry.id).map { x =>
-      totalActivations.decrementAndGet()
-      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).decrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).decrementAndGet()
-      x
-    }
-  }
-
-  override def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(aid).flatMap(removeActivation)
-  }
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index f6ce75db79..718079ead6 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.atomic.LongAdder
 
-import akka.actor.{Actor, ActorSystem, Props}
+import akka.actor.{Actor, ActorSystem, Cancellable, Props}
 import akka.cluster.ClusterEvent._
 import akka.cluster.{Cluster, Member, MemberStatus}
 import akka.event.Logging.InfoLevel
@@ -309,10 +309,23 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
     kafkaHosts ++
       Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
 
+  /** Generates a hash based on the string representation of namespace and action */
   def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): Int = {
     (namespace.asString.hashCode() ^ action.asString.hashCode()).abs
   }
 
+  /** Euclidean algorithm to determine the greatest-common-divisor */
+  @tailrec
+  def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
+
+  /** Returns pairwise coprime numbers until x. Result is memoized. */
+  def pairwiseCoprimeNumbersUntil(x: Int): IndexedSeq[Int] =
+    (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
+      if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
+        primes :+ cur
+      } else primes
+    })
+
   /**
    * Scans through all invokers and searches for an invoker tries to get a free slot on an invoker. If no slot can be
    * obtained, randomly picks a healthy invoker.
@@ -374,8 +387,8 @@ case class ShardingContainerPoolBalancerState(
   private var _invokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
   private var _managedInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
   private var _blackboxInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
-  private var _managedStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
-  private var _blackboxStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _managedStepSizes: Seq[Int] = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _blackboxStepSizes: Seq[Int] = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
   private var _invokerSlots: IndexedSeq[ForcableSemaphore] = IndexedSeq.empty[ForcableSemaphore],
   private var _clusterSize: Int = 1)(
   lbConfig: ShardingContainerPoolBalancerConfig =
@@ -419,8 +432,8 @@ case class ShardingContainerPoolBalancerState(
     _managedInvokers = _invokers.take(managed)
 
     if (oldSize != newSize) {
-      _managedStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
-      _blackboxStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
+      _managedStepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
+      _blackboxStepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
 
       if (oldSize < newSize) {
         // Keeps the existing state..
@@ -467,3 +480,18 @@ case class ShardingContainerPoolBalancerState(
  * @param invokerBusyThreshold how many slots an invoker has available in total
  */
 case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
+
+/**
+ * State kept for each activation until completion.
+ *
+ * @param id id of the activation
+ * @param namespaceId namespace that invoked the action
+ * @param invokerName invoker the action is scheduled to
+ * @param timeoutHandler times out completion of this activation, should be canceled on good paths
+ * @param promise the promise to be completed by the activation
+ */
+case class ActivationEntry(id: ActivationId,
+                           namespaceId: UUID,
+                           invokerName: InstanceId,
+                           timeoutHandler: Cancellable,
+                           promise: Promise[Either[ActivationId, WhiskActivation]])
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
deleted file mode 100644
index d0595d3ece..0000000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import akka.actor.{Actor, ActorLogging, ActorRef, Props}
-import akka.cluster.Cluster
-import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.{DistributedData, PNCounterMap, PNCounterMapKey}
-import akka.cluster.ddata.Replicator._
-import whisk.common.AkkaLogging
-
-case class IncreaseCounter(key: String, value: Long)
-case class DecreaseCounter(key: String, value: Long)
-case class ReadCounter(key: String)
-case class RemoveCounter(key: String)
-case object GetMap
-
-/**
- * Companion object to specify actor properties from the outside, e.g. name of the shared map and cluster seed nodes
- */
-object SharedDataService {
-  def props(storageName: String): Props =
-    Props(new SharedDataService(storageName))
-}
-
-class SharedDataService(storageName: String) extends Actor with ActorLogging {
-
-  val replicator = DistributedData(context.system).replicator
-
-  val logging = new AkkaLogging(context.system.log)
-
-  val storage = PNCounterMapKey[String](storageName)
-
-  implicit val node = Cluster(context.system)
-
-  /**
-   * Subscribe this node for the changes in the Map, initialize the Map
-   */
-  override def preStart(): Unit = {
-    replicator ! Subscribe(storage, self)
-    node.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
-    replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.remove(node, "0"))
-  }
-  override def postStop(): Unit = node.unsubscribe(self)
-
-  /**
-   * CRUD operations on the counter, process cluster member events for logging
-   * @return
-   */
-  def receive = {
-
-    case (IncreaseCounter(key, increment)) =>
-      replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.increment(key, increment))
-
-    case (DecreaseCounter(key, decrement)) =>
-      replicator ! Update(storage, PNCounterMap[String], writeLocal)(_.decrement(key, decrement))
-
-    case GetMap =>
-      replicator ! Get(storage, readLocal, request = Some((sender())))
-
-    case MemberUp(member) =>
-      logging.info(this, "Member is Up: " + member.address)
-
-    case MemberRemoved(member, previousStatus) =>
-      logging.warn(this, s"Member is Removed: ${member.address} after $previousStatus")
-
-    case c @ Changed(_) =>
-      logging.debug(this, "Current elements: " + c.get(storage))
-
-    case g @ GetSuccess(_, Some((replyTo: ActorRef))) =>
-      val map = g.get(storage).entries
-      replyTo ! map
-
-    case g @ GetSuccess(_, Some((replyTo: ActorRef, key: String))) =>
-      if (g.get(storage).contains(key)) {
-        val response = g.get(storage).getValue(key).intValue()
-        replyTo ! response
-      } else
-        replyTo ! None
-
-    case _ => // ignore
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
deleted file mode 100644
index fd3252d1c8..0000000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import whisk.core.loadBalancer.ContainerPoolBalancer
-import whisk.core.loadBalancer.Healthy
-import whisk.core.loadBalancer.Offline
-import whisk.core.loadBalancer.UnHealthy
-import whisk.core.entity.InstanceId
-
-/**
- * Unit tests for the ContainerPool object.
- *
- * These tests test only the "static" methods "schedule" and "remove"
- * of the ContainerPool object.
- */
-@RunWith(classOf[JUnitRunner])
-class ContainerPoolBalancerObjectTests extends FlatSpec with Matchers {
-  behavior of "memoize"
-
-  it should "not recompute a value which was already given" in {
-    var calls = 0
-    val add1: Int => Int = ContainerPoolBalancer.memoize {
-      case second =>
-        calls += 1
-        1 + second
-    }
-
-    add1(1) shouldBe 2
-    calls shouldBe 1
-    add1(1) shouldBe 2
-    calls shouldBe 1
-    add1(2) shouldBe 3
-    calls shouldBe 2
-    add1(1) shouldBe 2
-    calls shouldBe 2
-  }
-
-  behavior of "pairwiseCoprimeNumbersUntil"
-
-  it should "return an empty set for malformed inputs" in {
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
-  }
-
-  it should "return all coprime numbers until the number given" in {
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
-    ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
-  }
-
-  behavior of "chooseInvoker"
-
-  def invokers(n: Int) = (0 until n).map(i => (InstanceId(i), Healthy, 0))
-  def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
-
-  it should "return None on an empty invokers list" in {
-    ContainerPoolBalancer.schedule(IndexedSeq(), 0, 1) shouldBe None
-  }
-
-  it should "return None on a list of offline/unhealthy invokers" in {
-    val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0))
-
-    ContainerPoolBalancer.schedule(invs, 0, 1) shouldBe None
-  }
-
-  it should "schedule to the home invoker" in {
-    val invs = invokers(10)
-    val hash = 2
-
-    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size))
-  }
-
-  it should "take the only online invoker" in {
-    ContainerPoolBalancer.schedule(
-      IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)),
-      0,
-      1) shouldBe Some(InstanceId(2))
-  }
-
-  it should "skip an offline/unhealthy invoker, even if its underloaded" in {
-    val hash = 0
-    val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0))
-
-    ContainerPoolBalancer.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
-  }
-
-  it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in {
-    val invokerCount = 10
-    val hash = 2
-    val targetInvoker = hash % invokerCount
-
-    val invs = invokers(invokerCount).updated(targetInvoker, (InstanceId(targetInvoker), Healthy, 1))
-    val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash)
-
-    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size))
-  }
-
-  it should "wrap the search at the end of the invoker list" in {
-    val invokerCount = 3
-    val invs = IndexedSeq((InstanceId(0), Healthy, 1), (InstanceId(1), Healthy, 1), (InstanceId(2), Healthy, 0))
-    val hash = 1
-
-    val targetInvoker = hashInto(invs, hash) // will be invoker1
-    val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2
-    step shouldBe 2
-
-    // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target
-    // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded
-    ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size))
-  }
-
-  it should "multiply its threshold in 3 iterations to find an invoker with a good warm-chance" in {
-    val invs = IndexedSeq((InstanceId(0), Healthy, 33), (InstanceId(1), Healthy, 36), (InstanceId(2), Healthy, 33))
-    val hash = 0 // home is 0, stepsize is 1
-
-    // even though invoker1 is not the home invoker in this case, it gets chosen over
-    // the others because it's the first one encountered by the iteration mechanism to be below
-    // the threshold of 3 * 16 invocations
-    ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
-  }
-
-  it should "choose the random invoker if all invokers are overloaded even above the muliplied threshold" in {
-    val invs = IndexedSeq((InstanceId(0), Healthy, 33), (InstanceId(1), Healthy, 33), (InstanceId(2), Healthy, 33))
-    val invokerBusyThreshold = 11
-    val hash = 0
-    val bruteResult = (0 to 100) map { _ =>
-      ContainerPoolBalancer.schedule(invs, invokerBusyThreshold, hash).get.toInt
-    }
-    bruteResult should contain allOf (0, 1, 2)
-  }
-
-  it should "transparently work with partitioned sets of invokers" in {
-    val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4), Healthy, 0), (InstanceId(5), Healthy, 0))
-
-    ContainerPoolBalancer.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
-    ContainerPoolBalancer.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
-    ContainerPoolBalancer.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
-    ContainerPoolBalancer.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
deleted file mode 100644
index 5a4edb64d7..0000000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import akka.actor.ActorSystem
-import akka.actor.Cancellable
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import common.StreamLogging
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
-import whisk.core.loadBalancer.{ActivationEntry, DistributedLoadBalancerData, LocalLoadBalancerData}
-
-import scala.concurrent.{Await, Future, Promise}
-import whisk.core.entity.InstanceId
-
-import scala.concurrent.duration._
-
-@RunWith(classOf[JUnitRunner])
-class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
-  final val emptyCancellable: Cancellable = new Cancellable {
-    def isCancelled = false
-    def cancel() = true
-  }
-
-  val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
-  val firstEntry: ActivationEntry =
-    ActivationEntry(ActivationId.generate(), UUID(), InstanceId(0), emptyCancellable, activationIdPromise)
-  val secondEntry: ActivationEntry =
-    ActivationEntry(ActivationId.generate(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
-
-  val port = 2552
-  val host = "127.0.0.1"
-  val config = ConfigFactory
-    .parseString(s"akka.remote.netty.tcp.hostname = $host")
-    .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port))
-    .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("cluster"))
-    .withFallback(ConfigFactory.load())
-
-  val actorSystemName = "controller-actor-system"
-
-  implicit val actorSystem = ActorSystem(actorSystemName, config)
-
-  def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = Await.result(f, timeout)
-
-  behavior of "LoadBalancerData"
-
-  it should "return the number of activations for a namespace" in {
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-//    test all implementations
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-      await(lbd.activationCountPerInvoker) shouldBe Map(firstEntry.invokerName.toString -> 1)
-      lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
-
-      // clean up after yourself
-      lbd.removeActivation(firstEntry.id)
-    }
-  }
-
-  it should "return the number of activations for each invoker" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-      lbd.putActivation(secondEntry.id, secondEntry)
-
-      val res = await(lbd.activationCountPerInvoker)
-
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      res.get(secondEntry.invokerName.toString()) shouldBe Some(1)
-
-      lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
-      lbd.activationById(secondEntry.id) shouldBe Some(secondEntry)
-
-      // clean up after yourself
-      lbd.removeActivation(firstEntry.id)
-      lbd.removeActivation(secondEntry.id)
-    }
-
-  }
-
-  it should "remove activations and reflect that accordingly" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
-      lbd.removeActivation(firstEntry)
-
-      val resAfterRemoval = await(lbd.activationCountPerInvoker)
-      resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
-
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 0
-      lbd.activationById(firstEntry.id) shouldBe None
-    }
-
-  }
-
-  it should "remove activations from all 3 maps by activation id" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-
-      lbd.removeActivation(firstEntry.id)
-
-      val resAfterRemoval = await(lbd.activationCountPerInvoker)
-      resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
-    }
-
-  }
-
-  it should "return None if the entry doesn't exist when we remove it" in {
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.removeActivation(firstEntry) shouldBe None
-    }
-
-  }
-
-  it should "respond with different values accordingly" in {
-
-    val entry = ActivationEntry(ActivationId.generate(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
-    val entrySameInvokerAndNamespace = entry.copy(id = ActivationId.generate())
-    val entrySameInvoker = entry.copy(id = ActivationId.generate(), namespaceId = UUID())
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(entry.id, entry)
-
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      var res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(1)
-
-      lbd.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
-
-      lbd.putActivation(entrySameInvoker.id, entrySameInvoker)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(3)
-
-      lbd.removeActivation(entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
-
-      // removing non existing entry doesn't mess up
-      lbd.removeActivation(entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      res = await(lbd.activationCountPerInvoker)
-      res.get(entry.invokerName.toString()) shouldBe Some(2)
-
-      // clean up
-      lbd.removeActivation(entry)
-      lbd.removeActivation(entrySameInvoker.id)
-    }
-
-  }
-
-  it should "not add the same entry more then once" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      lbd.putActivation(firstEntry.id, firstEntry)
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
-      lbd.putActivation(firstEntry.id, firstEntry)
-      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
-      resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
-      lbd.removeActivation(firstEntry)
-      lbd.removeActivation(firstEntry)
-    }
-
-  }
-
-  it should "not evaluate the given block if an entry already exists" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      var called = 0
-
-      val entry = lbd.putActivation(firstEntry.id, {
-        called += 1
-        firstEntry
-      })
-
-      called shouldBe 1
-
-      // entry already exists, should not evaluate the block
-      val entryAfterSecond = lbd.putActivation(firstEntry.id, {
-        called += 1
-        firstEntry
-      })
-
-      called shouldBe 1
-      entry shouldBe entryAfterSecond
-
-      // clean up after yourself
-      lbd.removeActivation(firstEntry)
-    }
-
-  }
-
-  it should "not evaluate the given block even if an entry is different (but has the same id)" in {
-
-    val distributedLoadBalancerData = new DistributedLoadBalancerData()
-    val localLoadBalancerData = new LocalLoadBalancerData()
-
-    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
-    loadBalancerDataArray.map { lbd =>
-      var called = 0
-      val entrySameId = secondEntry.copy(id = firstEntry.id)
-
-      val entry = lbd.putActivation(firstEntry.id, {
-        called += 1
-        firstEntry
-      })
-
-      called shouldBe 1
-
-      val res = await(lbd.activationCountPerInvoker)
-      res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
-      // entry already exists, should not evaluate the block and change the state
-      val entryAfterSecond = lbd.putActivation(entrySameId.id, {
-        called += 1
-        entrySameId
-      })
-
-      called shouldBe 1
-      entry shouldBe entryAfterSecond
-      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
-      resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-    }
-
-  }
-
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 50201a605e..49712c237e 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -176,4 +176,21 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     bruteResult should contain allOf (0, 3)
     bruteResult should contain noneOf (1, 2)
   }
+
+  behavior of "pairwiseCoprimeNumbersUntil"
+
+  it should "return an empty set for malformed inputs" in {
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
+  }
+
+  it should "return all coprime numbers until the number given" in {
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
+    ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
deleted file mode 100644
index b3b37a716c..0000000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import akka.actor.ActorSystem
-import akka.testkit.{ImplicitSender, TestKit}
-import akka.util.Timeout
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import org.junit.runner.RunWith
-import org.scalatest.{FlatSpecLike, _}
-import org.scalatest.junit.JUnitRunner
-import whisk.core.loadBalancer._
-
-import scala.concurrent.duration._
-
-// Define your test specific configuration here
-
-object TestKitConfig {
-  val config = ConfigFactory.empty
-    .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef("127.0.0.1"))
-    .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef("2555"))
-    .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("cluster"))
-}
-
-@RunWith(classOf[JUnitRunner])
-class SharedDataServiceTests()
-    extends TestKit(ActorSystem("ControllerCluster", TestKitConfig.config))
-    with ImplicitSender
-    with FlatSpecLike
-    with Matchers
-    with BeforeAndAfterAll {
-
-  override def afterAll {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  behavior of "SharedDataService"
-
-  val sharedDataService = system.actorOf(SharedDataService.props("Candidates"), name = "busyMan")
-  implicit val timeout = Timeout(5.seconds)
-
-  it should "retrieve an empty map after initialization" in {
-    sharedDataService ! GetMap
-    val msg = Map()
-    expectMsg(msg)
-  }
-  it should "increase the counter" in {
-    sharedDataService ! IncreaseCounter("Donald", 1)
-    sharedDataService ! GetMap
-    val msg = Map("Donald" -> 1)
-    expectMsg(msg)
-  }
-  it should "decrease the counter" in {
-    sharedDataService ! IncreaseCounter("Donald", 2)
-    sharedDataService ! DecreaseCounter("Donald", 2)
-    sharedDataService ! GetMap
-    val msg = Map("Donald" -> 1)
-    expectMsg(msg)
-  }
-  it should "receive the map with all counters" in {
-    sharedDataService ! IncreaseCounter("Hilary", 1)
-    sharedDataService ! GetMap
-    val msg = Map("Hilary" -> 1, "Donald" -> 1)
-    expectMsg(msg)
-  }
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services