You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/04/03 14:57:27 UTC
[incubator-openwhisk] branch master updated: Remove deprecated
loadbalancer. (#3413)
This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new f647224 Remove deprecated loadbalancer. (#3413)
f647224 is described below
commit f64722498bc2f5eadc356b3ae26ebb85de1bcdbf
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Tue Apr 3 16:57:24 2018 +0200
Remove deprecated loadbalancer. (#3413)
---
.../core/loadBalancer/ContainerPoolBalancer.scala | 356 ---------------------
.../loadBalancer/DistributedLoadBalancerData.scala | 90 ------
.../whisk/core/loadBalancer/LoadBalancer.scala | 3 +
.../whisk/core/loadBalancer/LoadBalancerData.scala | 86 -----
.../core/loadBalancer/LocalLoadBalancerData.scala | 76 -----
.../ShardingContainerPoolBalancer.scala | 38 ++-
.../core/loadBalancer/SharedDataService.scala | 98 ------
.../test/ContainerPoolBalancerObjectTests.scala | 164 ----------
.../loadBalancer/test/LoadBalancerDataTests.scala | 295 -----------------
.../test/ShardingContainerPoolBalancerTests.scala | 17 +
.../loadBalancer/test/SharedDataServiceTests.scala | 81 -----
11 files changed, 53 insertions(+), 1251 deletions(-)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
deleted file mode 100644
index 570281d..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.ThreadLocalRandom
-
-import akka.actor.{ActorSystem, Props}
-import akka.cluster.Cluster
-import akka.pattern.ask
-import akka.stream.ActorMaterializer
-import akka.util.Timeout
-import org.apache.kafka.clients.producer.RecordMetadata
-import pureconfig._
-import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
-import whisk.core.WhiskConfig._
-import whisk.core.connector._
-import whisk.core.entity._
-import whisk.core.{ConfigKeys, WhiskConfig}
-import whisk.spi.SpiLoader
-import akka.event.Logging.InfoLevel
-import pureconfig._
-
-import scala.annotation.tailrec
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
-
-case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
-
-class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)(implicit val actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer)
- extends LoadBalancer {
-
- private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
-
- /** The execution context for futures */
- private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
-
- private val activeAckTimeoutGrace = 1.minute
-
- /** How many invokers are dedicated to blackbox images. We range bound to something sensical regardless of configuration. */
- private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
- logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
-
- /** Feature switch for shared load balancer data **/
- private val loadBalancerData = {
- if (config.controllerLocalBookkeeping) {
- new LocalLoadBalancerData()
- } else {
-
- /** Specify how seed nodes are generated */
- val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
- Cluster(actorSystem).joinSeedNodes(seedNodesProvider.getSeedNodes())
- new DistributedLoadBalancerData()
- }
- }
-
- override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
- implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
- chooseInvoker(msg.user, action).flatMap { invokerName =>
- val entry = setupActivation(action, msg.activationId, msg.user.uuid, invokerName, transid)
- sendActivationToInvoker(messageProducer, msg, invokerName).map { _ =>
- entry.promise.future
- }
- }
- }
-
- /** An indexed sequence of all invokers in the current system. */
- override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = {
- invokerPool
- .ask(GetStatus)(Timeout(5.seconds))
- .mapTo[IndexedSeq[InvokerHealth]]
- }
-
- override def activeActivationsFor(namespace: UUID) = loadBalancerData.activationCountOn(namespace)
-
- override def totalActiveActivations = loadBalancerData.totalActivationCount
-
- override def clusterSize = config.controllerInstances.toInt
-
- /**
- * Tries to fill in the result slot (i.e., complete the promise) when a completion message arrives.
- * The promise is removed form the map when the result arrives or upon timeout.
- *
- * @param msg is the kafka message payload as Json
- */
- private def processCompletion(response: Either[ActivationId, WhiskActivation],
- tid: TransactionId,
- forced: Boolean,
- invoker: InstanceId): Unit = {
- val aid = response.fold(l => l, r => r.activationId)
-
- // treat left as success (as it is the result of a message exceeding the bus limit)
- val isSuccess = response.fold(l => true, r => !r.response.isWhiskError)
-
- loadBalancerData.removeActivation(aid) match {
- case Some(entry) =>
- logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
- // Active acks that are received here are strictly from user actions - health actions are not part of
- // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
- invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
- if (!forced) {
- entry.timeoutHandler.cancel()
- entry.promise.trySuccess(response)
- } else {
- entry.promise.tryFailure(new Throwable("no active ack received"))
- }
- case None if !forced =>
- // the entry has already been removed but we receive an active ack for this activation Id.
- // This happens for health actions, because they don't have an entry in Loadbalancerdata or
- // for activations that already timed out.
- invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
- logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
- case None =>
- // the entry has already been removed by an active ack. This part of the code is reached by the timeout.
- // As the active ack is already processed we don't have to do anything here.
- logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
- }
- }
-
- /**
- * Creates an activation entry and insert into various maps.
- */
- private def setupActivation(action: ExecutableWhiskActionMetaData,
- activationId: ActivationId,
- namespaceId: UUID,
- invokerName: InstanceId,
- transid: TransactionId): ActivationEntry = {
- val timeout = (action.limits.timeout.duration
- .max(TimeLimit.STD_DURATION) * config.controllerInstances.toInt) + activeAckTimeoutGrace
- // Install a timeout handler for the catastrophic case where an active ack is not received at all
- // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
- // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
- // in this case, if the activation handler is still registered, remove it and update the books.
- // in case of missing synchronization between n controllers in HA configuration the invoker queue can be overloaded
- // n-1 times and the maximal time for answering with active ack can be n times the action time (plus some overhead)
- loadBalancerData.putActivation(
- activationId, {
- val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
- processCompletion(Left(activationId), transid, forced = true, invoker = invokerName)
- }
-
- // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
- ActivationEntry(
- activationId,
- namespaceId,
- invokerName,
- timeoutHandler,
- Promise[Either[ActivationId, WhiskActivation]]())
- })
- }
-
- /** Gets a producer which can publish messages to the kafka bus. */
- private val messagingProvider = SpiLoader.get[MessagingProvider]
- private val messageProducer = messagingProvider.getProducer(config)
-
- private def sendActivationToInvoker(producer: MessageProducer,
- msg: ActivationMessage,
- invoker: InstanceId): Future[RecordMetadata] = {
- implicit val transid = msg.transid
-
- MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
- val topic = s"invoker${invoker.toInt}"
- val start = transid.started(
- this,
- LoggingMarkers.CONTROLLER_KAFKA,
- s"posting topic '$topic' with activation id '${msg.activationId}'",
- logLevel = InfoLevel)
-
- producer.send(topic, msg).andThen {
- case Success(status) =>
- transid.finished(this, start, s"posted to ${status.topic()}[${status.partition()}][${status.offset()}]")
- case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic")
- }
- }
-
- private val invokerPool = {
- InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore())
-
- actorSystem.actorOf(
- InvokerPool.props(
- (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
- (m, i) => sendActivationToInvoker(messageProducer, m, i),
- messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128)))
- }
-
- /**
- * Subscribes to active acks (completion messages from the invokers), and
- * registers a handler for received active acks from invokers.
- */
- val activeAckTopic = s"completed${controllerInstance.toInt}"
- val maxActiveAcksPerPoll = 128
- val activeAckPollDuration = 1.second
- private val activeAckConsumer =
- messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll)
-
- val activationFeed = actorSystem.actorOf(Props {
- new MessageFeed(
- "activeack",
- logging,
- activeAckConsumer,
- maxActiveAcksPerPoll,
- activeAckPollDuration,
- processActiveAck)
- })
-
- def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
- val raw = new String(bytes, StandardCharsets.UTF_8)
- CompletionMessage.parse(raw) match {
- case Success(m: CompletionMessage) =>
- processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
- activationFeed ! MessageFeed.Processed
-
- case Failure(t) =>
- activationFeed ! MessageFeed.Processed
- logging.error(this, s"failed processing message: $raw with $t")
- }
- }
-
- /** Compute the number of blackbox-dedicated invokers by applying a rounded down fraction of all invokers (but at least 1). */
- private def numBlackbox(totalInvokers: Int) = Math.max(1, (totalInvokers.toDouble * blackboxFraction).toInt)
-
- /** Return invokers dedicated to running blackbox actions. */
- private def blackboxInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
- val blackboxes = numBlackbox(invokers.size)
- invokers.takeRight(blackboxes)
- }
-
- /**
- * Return (at least one) invokers for running non black-box actions.
- * This set can overlap with the blackbox set if there is only one invoker.
- */
- private def managedInvokers(invokers: IndexedSeq[InvokerHealth]): IndexedSeq[InvokerHealth] = {
- val managed = Math.max(1, invokers.length - numBlackbox(invokers.length))
- invokers.take(managed)
- }
-
- /** Determine which invoker this activation should go to. Due to dynamic conditions, it may return no invoker. */
- private def chooseInvoker(user: Identity, action: ExecutableWhiskActionMetaData): Future[InstanceId] = {
- val hash = generateHash(user.namespace, action)
-
- loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
- invokerHealth().flatMap { invokers =>
- val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
- val invokersWithUsage = invokersToUse.view.map {
- // Using a view defers the comparably expensive lookup to actual access of the element
- case invoker => (invoker.id, invoker.status, currentActivations.getOrElse(invoker.id.toString, 0))
- }
-
- ContainerPoolBalancer.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
- case Some(invoker) => Future.successful(invoker)
- case None =>
- logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
- Future.failed(new LoadBalancerException("no invokers available"))
- }
- }
- }
- }
-
- /** Generates a hash based on the string representation of namespace and action */
- private def generateHash(namespace: EntityName, action: ExecutableWhiskActionMetaData): Int = {
- (namespace.asString.hashCode() ^ action.fullyQualifiedName(false).asString.hashCode()).abs
- }
-}
-
-object ContainerPoolBalancer extends LoadBalancerProvider {
-
- override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
- implicit actorSystem: ActorSystem,
- logging: Logging,
- materializer: ActorMaterializer): LoadBalancer = new ContainerPoolBalancer(whiskConfig, instance)
-
- def requiredProperties =
- kafkaHosts ++
- Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
-
- /** Memoizes the result of `f` for later use. */
- def memoize[I, O](f: I => O): I => O = new scala.collection.mutable.HashMap[I, O]() {
- override def apply(key: I) = getOrElseUpdate(key, f(key))
- }
-
- /** Euclidean algorithm to determine the greatest-common-divisor */
- @tailrec
- def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
-
- /** Returns pairwise coprime numbers until x. Result is memoized. */
- val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] = ContainerPoolBalancer.memoize {
- case x =>
- (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
- if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
- primes :+ cur
- } else primes
- })
- }
-
- /**
- * Scans through all invokers and searches for an invoker, that has a queue length
- * below the defined threshold. The threshold is subject to a 3 times back off. Iff
- * no "underloaded" invoker was found it will default to the first invoker in the
- * step-defined progression that is healthy.
- *
- * @param invokers a list of available invokers to search in, including their state and usage
- * @param invokerBusyThreshold defines when an invoker is considered overloaded
- * @param hash stable identifier of the entity to be scheduled
- * @return an invoker to schedule to or None of no invoker is available
- */
- def schedule(invokers: Seq[(InstanceId, InvokerState, Int)],
- invokerBusyThreshold: Int,
- hash: Int): Option[InstanceId] = {
-
- val numInvokers = invokers.size
- if (numInvokers > 0) {
- val homeInvoker = hash % numInvokers
- val stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(numInvokers)
- val step = stepSizes(hash % stepSizes.size)
-
- val invokerProgression = Stream
- .from(0)
- .take(numInvokers)
- .map(i => (homeInvoker + i * step) % numInvokers)
- .map(invokers)
- .filter(_._2 == Healthy)
-
- invokerProgression
- .find(_._3 < invokerBusyThreshold)
- .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 2))
- .orElse(invokerProgression.find(_._3 < invokerBusyThreshold * 3))
- .orElse(
- if (invokerProgression.isEmpty)
- None
- else
- Some(invokerProgression(ThreadLocalRandom.current().nextInt(invokerProgression.size))))
- .map(_._1)
- } else None
- }
-
-}
-
-private case class LoadBalancerException(msg: String) extends Throwable(msg)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
deleted file mode 100644
index 34b5d67..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import akka.actor.ActorSystem
-import akka.util.Timeout
-import akka.pattern.ask
-import whisk.common.Logging
-import whisk.core.entity.{ActivationId, UUID}
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import scala.concurrent.duration._
-
-/**
- * Encapsulates data used for loadbalancer and active-ack bookkeeping.
- *
- * Note: The state keeping is backed by distributed akka actors. All CRUDs operations are done on local values, thus
- * a stale value might be read.
- */
-class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Logging) extends LoadBalancerData {
-
- implicit val timeout = Timeout(5.seconds)
- implicit val executionContext = actorSystem.dispatcher
- private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-
- private val sharedStateInvokers = actorSystem.actorOf(
- SharedDataService.props("Invokers"),
- name =
- "SharedDataServiceInvokers" + UUID())
- private val sharedStateNamespaces = actorSystem.actorOf(
- SharedDataService.props("Namespaces"),
- name =
- "SharedDataServiceNamespaces" + UUID())
-
- def totalActivationCount =
- (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.values.sum.toInt)
-
- def activationCountOn(namespace: UUID): Future[Int] = {
- (sharedStateNamespaces ? GetMap)
- .mapTo[Map[String, BigInt]]
- .map(_.mapValues(_.toInt).getOrElse(namespace.toString, 0))
- }
-
- def activationCountPerInvoker: Future[Map[String, Int]] = {
- (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.mapValues(_.toInt))
- }
-
- def activationById(activationId: ActivationId): Option[ActivationEntry] = {
- activationsById.get(activationId)
- }
-
- def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
- activationsById.getOrElseUpdate(id, {
- val entry = update
- sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 1)
- sharedStateInvokers ! IncreaseCounter(entry.invokerName.toString, 1)
- logging.debug(this, "increased shared counters")
- entry
- })
- }
-
- def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
- activationsById.remove(entry.id).map { activationEntry =>
- sharedStateInvokers ! DecreaseCounter(entry.invokerName.toString, 1)
- sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, 1)
- logging.debug(this, "decreased shared counters")
- activationEntry
- }
- }
-
- def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
- activationsById.get(aid).flatMap(removeActivation)
- }
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
index ab8db1c..52ffd73 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -86,3 +86,6 @@ trait LoadBalancerProvider extends Spi {
logging: Logging,
materializer: ActorMaterializer): LoadBalancer
}
+
+/** Exception thrown by the loadbalancer */
+case class LoadBalancerException(msg: String) extends Throwable(msg)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
deleted file mode 100644
index 0018cbb..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
-
-import akka.actor.Cancellable
-import scala.concurrent.{Future, Promise}
-
-// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
-case class ActivationEntry(id: ActivationId,
- namespaceId: UUID,
- invokerName: InstanceId,
- timeoutHandler: Cancellable,
- promise: Promise[Either[ActivationId, WhiskActivation]])
-trait LoadBalancerData {
-
- /** Get the number of activations across all namespaces. */
- def totalActivationCount: Future[Int]
-
- /**
- * Get the number of activations for a specific namespace.
- *
- * @param namespace The namespace to get the activation count for
- * @return a map (namespace -> number of activations in the system)
- */
- def activationCountOn(namespace: UUID): Future[Int]
-
- /**
- * Get the number of activations for each invoker.
- *
- * @return a map (invoker -> number of activations queued for the invoker)
- */
- def activationCountPerInvoker: Future[Map[String, Int]]
-
- /**
- * Get an activation entry for a given activation id.
- *
- * @param activationId activation id to get data for
- * @return the respective activation or None if it doesn't exist
- */
- def activationById(activationId: ActivationId): Option[ActivationEntry]
-
- /**
- * Adds an activation entry.
- *
- * @param id identifier to deduplicate the entry
- * @param update block calculating the entry to add.
- * Note: This is evaluated iff the entry
- * didn't exist before.
- * @return the entry calculated by the block or iff it did
- * exist before the entry from the state
- */
- def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry
-
- /**
- * Removes the given entry.
- *
- * @param entry the entry to remove
- * @return The deleted entry or None if nothing got deleted
- */
- def removeActivation(entry: ActivationEntry): Option[ActivationEntry]
-
- /**
- * Removes the activation identified by the given activation id.
- *
- * @param aid activation id to remove
- * @return The deleted entry or None if nothing got deleted
- */
- def removeActivation(aid: ActivationId): Option[ActivationEntry]
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
deleted file mode 100644
index 92e3789..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
-import whisk.core.entity.{ActivationId, UUID}
-
-/**
- * Loadbalancer bookkeeping data which are stored locally,
- * e.g. not shared with other controller instances.
- *
- * Note: The state keeping is backed by concurrent data-structures. As such,
- * concurrent reads can return stale values (especially the counters returned).
- */
-class LocalLoadBalancerData() extends LoadBalancerData {
-
- private val activationByInvoker = TrieMap[String, AtomicInteger]()
- private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
- private val activationsById = TrieMap[ActivationId, ActivationEntry]()
- private val totalActivations = new AtomicInteger(0)
-
- override def totalActivationCount: Future[Int] = Future.successful(totalActivations.get)
-
- override def activationCountOn(namespace: UUID): Future[Int] = {
- Future.successful(activationByNamespaceId.get(namespace).map(_.get).getOrElse(0))
- }
-
- override def activationCountPerInvoker: Future[Map[String, Int]] = {
- Future.successful(activationByInvoker.toMap.mapValues(_.get))
- }
-
- override def activationById(activationId: ActivationId): Option[ActivationEntry] = {
- activationsById.get(activationId)
- }
-
- override def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
- activationsById.getOrElseUpdate(id, {
- val entry = update
- totalActivations.incrementAndGet()
- activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).incrementAndGet()
- activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).incrementAndGet()
- entry
- })
- }
-
- override def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
- activationsById.remove(entry.id).map { x =>
- totalActivations.decrementAndGet()
- activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new AtomicInteger(0)).decrementAndGet()
- activationByInvoker.getOrElseUpdate(entry.invokerName.toString, new AtomicInteger(0)).decrementAndGet()
- x
- }
- }
-
- override def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
- activationsById.get(aid).flatMap(removeActivation)
- }
-}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index f6ce75d..718079e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.LongAdder
-import akka.actor.{Actor, ActorSystem, Props}
+import akka.actor.{Actor, ActorSystem, Cancellable, Props}
import akka.cluster.ClusterEvent._
import akka.cluster.{Cluster, Member, MemberStatus}
import akka.event.Logging.InfoLevel
@@ -309,10 +309,23 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider {
kafkaHosts ++
Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
+ /** Generates a hash based on the string representation of namespace and action */
def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): Int = {
(namespace.asString.hashCode() ^ action.asString.hashCode()).abs
}
+ /** Euclidean algorithm to determine the greatest-common-divisor */
+ @tailrec
+ def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
+
+ /** Returns pairwise coprime numbers until x. Result is memoized. */
+ def pairwiseCoprimeNumbersUntil(x: Int): IndexedSeq[Int] =
+ (1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
+ if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
+ primes :+ cur
+ } else primes
+ })
+
/**
* Scans through all invokers and searches for an invoker tries to get a free slot on an invoker. If no slot can be
* obtained, randomly picks a healthy invoker.
@@ -374,8 +387,8 @@ case class ShardingContainerPoolBalancerState(
private var _invokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
private var _managedInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
private var _blackboxInvokers: IndexedSeq[InvokerHealth] = IndexedSeq.empty[InvokerHealth],
- private var _managedStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
- private var _blackboxStepSizes: Seq[Int] = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+ private var _managedStepSizes: Seq[Int] = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+ private var _blackboxStepSizes: Seq[Int] = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
private var _invokerSlots: IndexedSeq[ForcableSemaphore] = IndexedSeq.empty[ForcableSemaphore],
private var _clusterSize: Int = 1)(
lbConfig: ShardingContainerPoolBalancerConfig =
@@ -419,8 +432,8 @@ case class ShardingContainerPoolBalancerState(
_managedInvokers = _invokers.take(managed)
if (oldSize != newSize) {
- _managedStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
- _blackboxStepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
+ _managedStepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
+ _blackboxStepSizes = ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
if (oldSize < newSize) {
// Keeps the existing state..
@@ -467,3 +480,18 @@ case class ShardingContainerPoolBalancerState(
* @param invokerBusyThreshold how many slots an invoker has available in total
*/
case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
+
+/**
+ * State kept for each activation until completion.
+ *
+ * @param id id of the activation
+ * @param namespaceId namespace that invoked the action
+ * @param invokerName invoker the action is scheduled to
+ * @param timeoutHandler times out completion of this activation, should be canceled on good paths
+ * @param promise the promise to be completed by the activation
+ */
+case class ActivationEntry(id: ActivationId,
+ namespaceId: UUID,
+ invokerName: InstanceId,
+ timeoutHandler: Cancellable,
+ promise: Promise[Either[ActivationId, WhiskActivation]])
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
deleted file mode 100644
index d0595d3..0000000
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer
-
-import akka.actor.{Actor, ActorLogging, ActorRef, Props}
-import akka.cluster.Cluster
-import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.{DistributedData, PNCounterMap, PNCounterMapKey}
-import akka.cluster.ddata.Replicator._
-import whisk.common.AkkaLogging
-
-case class IncreaseCounter(key: String, value: Long)
-case class DecreaseCounter(key: String, value: Long)
-case class ReadCounter(key: String)
-case class RemoveCounter(key: String)
-case object GetMap
-
-/**
- * Companion object to specify actor properties from the outside, e.g. name of the shared map and cluster seed nodes
- */
-object SharedDataService {
- def props(storageName: String): Props =
- Props(new SharedDataService(storageName))
-}
-
-class SharedDataService(storageName: String) extends Actor with ActorLogging {
-
- val replicator = DistributedData(context.system).replicator
-
- val logging = new AkkaLogging(context.system.log)
-
- val storage = PNCounterMapKey[String](storageName)
-
- implicit val node = Cluster(context.system)
-
- /**
- * Subscribe this node for the changes in the Map, initialize the Map
- */
- override def preStart(): Unit = {
- replicator ! Subscribe(storage, self)
- node.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
- replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.remove(node, "0"))
- }
- override def postStop(): Unit = node.unsubscribe(self)
-
- /**
- * CRUD operations on the counter, process cluster member events for logging
- * @return
- */
- def receive = {
-
- case (IncreaseCounter(key, increment)) =>
- replicator ! Update(storage, PNCounterMap.empty[String], writeLocal)(_.increment(key, increment))
-
- case (DecreaseCounter(key, decrement)) =>
- replicator ! Update(storage, PNCounterMap[String], writeLocal)(_.decrement(key, decrement))
-
- case GetMap =>
- replicator ! Get(storage, readLocal, request = Some((sender())))
-
- case MemberUp(member) =>
- logging.info(this, "Member is Up: " + member.address)
-
- case MemberRemoved(member, previousStatus) =>
- logging.warn(this, s"Member is Removed: ${member.address} after $previousStatus")
-
- case c @ Changed(_) =>
- logging.debug(this, "Current elements: " + c.get(storage))
-
- case g @ GetSuccess(_, Some((replyTo: ActorRef))) =>
- val map = g.get(storage).entries
- replyTo ! map
-
- case g @ GetSuccess(_, Some((replyTo: ActorRef, key: String))) =>
- if (g.get(storage).contains(key)) {
- val response = g.get(storage).getValue(key).intValue()
- replyTo ! response
- } else
- replyTo ! None
-
- case _ => // ignore
- }
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
deleted file mode 100644
index fd3252d..0000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import whisk.core.loadBalancer.ContainerPoolBalancer
-import whisk.core.loadBalancer.Healthy
-import whisk.core.loadBalancer.Offline
-import whisk.core.loadBalancer.UnHealthy
-import whisk.core.entity.InstanceId
-
-/**
- * Unit tests for the ContainerPool object.
- *
- * These tests test only the "static" methods "schedule" and "remove"
- * of the ContainerPool object.
- */
-@RunWith(classOf[JUnitRunner])
-class ContainerPoolBalancerObjectTests extends FlatSpec with Matchers {
- behavior of "memoize"
-
- it should "not recompute a value which was already given" in {
- var calls = 0
- val add1: Int => Int = ContainerPoolBalancer.memoize {
- case second =>
- calls += 1
- 1 + second
- }
-
- add1(1) shouldBe 2
- calls shouldBe 1
- add1(1) shouldBe 2
- calls shouldBe 1
- add1(2) shouldBe 3
- calls shouldBe 2
- add1(1) shouldBe 2
- calls shouldBe 2
- }
-
- behavior of "pairwiseCoprimeNumbersUntil"
-
- it should "return an empty set for malformed inputs" in {
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
- }
-
- it should "return all coprime numbers until the number given" in {
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
- ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
- }
-
- behavior of "chooseInvoker"
-
- def invokers(n: Int) = (0 until n).map(i => (InstanceId(i), Healthy, 0))
- def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
-
- it should "return None on an empty invokers list" in {
- ContainerPoolBalancer.schedule(IndexedSeq(), 0, 1) shouldBe None
- }
-
- it should "return None on a list of offline/unhealthy invokers" in {
- val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0))
-
- ContainerPoolBalancer.schedule(invs, 0, 1) shouldBe None
- }
-
- it should "schedule to the home invoker" in {
- val invs = invokers(10)
- val hash = 2
-
- ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash % invs.size))
- }
-
- it should "take the only online invoker" in {
- ContainerPoolBalancer.schedule(
- IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0)),
- 0,
- 1) shouldBe Some(InstanceId(2))
- }
-
- it should "skip an offline/unhealthy invoker, even if its underloaded" in {
- val hash = 0
- val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1), UnHealthy, 0), (InstanceId(2), Healthy, 0))
-
- ContainerPoolBalancer.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
- }
-
- it should "jump to the next invoker determined by a hashed stepsize if the home invoker is overloaded" in {
- val invokerCount = 10
- val hash = 2
- val targetInvoker = hash % invokerCount
-
- val invs = invokers(invokerCount).updated(targetInvoker, (InstanceId(targetInvoker), Healthy, 1))
- val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash)
-
- ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step) % invs.size))
- }
-
- it should "wrap the search at the end of the invoker list" in {
- val invokerCount = 3
- val invs = IndexedSeq((InstanceId(0), Healthy, 1), (InstanceId(1), Healthy, 1), (InstanceId(2), Healthy, 0))
- val hash = 1
-
- val targetInvoker = hashInto(invs, hash) // will be invoker1
- val step = hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash) // will be 2
- step shouldBe 2
-
- // invoker1 is overloaded so it will step (2 steps) to the next one --> 1 2 0 --> invoker0 is next target
- // invoker0 is overloaded so it will step to the next one --> 0 1 2 --> invoker2 is next target and underloaded
- ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash + step + step) % invs.size))
- }
-
- it should "multiply its threshold in 3 iterations to find an invoker with a good warm-chance" in {
- val invs = IndexedSeq((InstanceId(0), Healthy, 33), (InstanceId(1), Healthy, 36), (InstanceId(2), Healthy, 33))
- val hash = 0 // home is 0, stepsize is 1
-
- // even though invoker1 is not the home invoker in this case, it gets chosen over
- // the others because it's the first one encountered by the iteration mechanism to be below
- // the threshold of 3 * 16 invocations
- ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
- }
-
- it should "choose the random invoker if all invokers are overloaded even above the muliplied threshold" in {
- val invs = IndexedSeq((InstanceId(0), Healthy, 33), (InstanceId(1), Healthy, 33), (InstanceId(2), Healthy, 33))
- val invokerBusyThreshold = 11
- val hash = 0
- val bruteResult = (0 to 100) map { _ =>
- ContainerPoolBalancer.schedule(invs, invokerBusyThreshold, hash).get.toInt
- }
- bruteResult should contain allOf (0, 1, 2)
- }
-
- it should "transparently work with partitioned sets of invokers" in {
- val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4), Healthy, 0), (InstanceId(5), Healthy, 0))
-
- ContainerPoolBalancer.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
- ContainerPoolBalancer.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
- ContainerPoolBalancer.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
- ContainerPoolBalancer.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
- }
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
deleted file mode 100644
index 5a4edb6..0000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import akka.actor.ActorSystem
-import akka.actor.Cancellable
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import common.StreamLogging
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
-import whisk.core.loadBalancer.{ActivationEntry, DistributedLoadBalancerData, LocalLoadBalancerData}
-
-import scala.concurrent.{Await, Future, Promise}
-import whisk.core.entity.InstanceId
-
-import scala.concurrent.duration._
-
-@RunWith(classOf[JUnitRunner])
-class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
- final val emptyCancellable: Cancellable = new Cancellable {
- def isCancelled = false
- def cancel() = true
- }
-
- val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]()
- val firstEntry: ActivationEntry =
- ActivationEntry(ActivationId.generate(), UUID(), InstanceId(0), emptyCancellable, activationIdPromise)
- val secondEntry: ActivationEntry =
- ActivationEntry(ActivationId.generate(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
-
- val port = 2552
- val host = "127.0.0.1"
- val config = ConfigFactory
- .parseString(s"akka.remote.netty.tcp.hostname = $host")
- .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port))
- .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("cluster"))
- .withFallback(ConfigFactory.load())
-
- val actorSystemName = "controller-actor-system"
-
- implicit val actorSystem = ActorSystem(actorSystemName, config)
-
- def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = Await.result(f, timeout)
-
- behavior of "LoadBalancerData"
-
- it should "return the number of activations for a namespace" in {
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-// test all implementations
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- lbd.putActivation(firstEntry.id, firstEntry)
- await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
- await(lbd.activationCountPerInvoker) shouldBe Map(firstEntry.invokerName.toString -> 1)
- lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
-
- // clean up after yourself
- lbd.removeActivation(firstEntry.id)
- }
- }
-
- it should "return the number of activations for each invoker" in {
-
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- lbd.putActivation(firstEntry.id, firstEntry)
- lbd.putActivation(secondEntry.id, secondEntry)
-
- val res = await(lbd.activationCountPerInvoker)
-
- res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
- res.get(secondEntry.invokerName.toString()) shouldBe Some(1)
-
- lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
- lbd.activationById(secondEntry.id) shouldBe Some(secondEntry)
-
- // clean up after yourself
- lbd.removeActivation(firstEntry.id)
- lbd.removeActivation(secondEntry.id)
- }
-
- }
-
- it should "remove activations and reflect that accordingly" in {
-
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- lbd.putActivation(firstEntry.id, firstEntry)
- val res = await(lbd.activationCountPerInvoker)
- res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-
- await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
- lbd.removeActivation(firstEntry)
-
- val resAfterRemoval = await(lbd.activationCountPerInvoker)
- resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
-
- await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 0
- lbd.activationById(firstEntry.id) shouldBe None
- }
-
- }
-
- it should "remove activations from all 3 maps by activation id" in {
-
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- lbd.putActivation(firstEntry.id, firstEntry)
-
- val res = await(lbd.activationCountPerInvoker)
- res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-
- lbd.removeActivation(firstEntry.id)
-
- val resAfterRemoval = await(lbd.activationCountPerInvoker)
- resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
- }
-
- }
-
- it should "return None if the entry doesn't exist when we remove it" in {
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- lbd.removeActivation(firstEntry) shouldBe None
- }
-
- }
-
- it should "respond with different values accordingly" in {
-
- val entry = ActivationEntry(ActivationId.generate(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise)
- val entrySameInvokerAndNamespace = entry.copy(id = ActivationId.generate())
- val entrySameInvoker = entry.copy(id = ActivationId.generate(), namespaceId = UUID())
-
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- lbd.putActivation(entry.id, entry)
-
- await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
- var res = await(lbd.activationCountPerInvoker)
- res.get(entry.invokerName.toString()) shouldBe Some(1)
-
- lbd.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace)
- await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
- res = await(lbd.activationCountPerInvoker)
- res.get(entry.invokerName.toString()) shouldBe Some(2)
-
- lbd.putActivation(entrySameInvoker.id, entrySameInvoker)
- await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
- res = await(lbd.activationCountPerInvoker)
- res.get(entry.invokerName.toString()) shouldBe Some(3)
-
- lbd.removeActivation(entrySameInvokerAndNamespace)
- await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
- res = await(lbd.activationCountPerInvoker)
- res.get(entry.invokerName.toString()) shouldBe Some(2)
-
- // removing non existing entry doesn't mess up
- lbd.removeActivation(entrySameInvokerAndNamespace)
- await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
- res = await(lbd.activationCountPerInvoker)
- res.get(entry.invokerName.toString()) shouldBe Some(2)
-
- // clean up
- lbd.removeActivation(entry)
- lbd.removeActivation(entrySameInvoker.id)
- }
-
- }
-
- it should "not add the same entry more then once" in {
-
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- lbd.putActivation(firstEntry.id, firstEntry)
- val res = await(lbd.activationCountPerInvoker)
- res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
- await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
- lbd.putActivation(firstEntry.id, firstEntry)
- val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
- resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
- await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
- lbd.removeActivation(firstEntry)
- lbd.removeActivation(firstEntry)
- }
-
- }
-
- it should "not evaluate the given block if an entry already exists" in {
-
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- var called = 0
-
- val entry = lbd.putActivation(firstEntry.id, {
- called += 1
- firstEntry
- })
-
- called shouldBe 1
-
- // entry already exists, should not evaluate the block
- val entryAfterSecond = lbd.putActivation(firstEntry.id, {
- called += 1
- firstEntry
- })
-
- called shouldBe 1
- entry shouldBe entryAfterSecond
-
- // clean up after yourself
- lbd.removeActivation(firstEntry)
- }
-
- }
-
- it should "not evaluate the given block even if an entry is different (but has the same id)" in {
-
- val distributedLoadBalancerData = new DistributedLoadBalancerData()
- val localLoadBalancerData = new LocalLoadBalancerData()
-
- val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
- loadBalancerDataArray.map { lbd =>
- var called = 0
- val entrySameId = secondEntry.copy(id = firstEntry.id)
-
- val entry = lbd.putActivation(firstEntry.id, {
- called += 1
- firstEntry
- })
-
- called shouldBe 1
-
- val res = await(lbd.activationCountPerInvoker)
- res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
- await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-
- // entry already exists, should not evaluate the block and change the state
- val entryAfterSecond = lbd.putActivation(entrySameId.id, {
- called += 1
- entrySameId
- })
-
- called shouldBe 1
- entry shouldBe entryAfterSecond
- val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
- resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
- await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
- }
-
- }
-
-}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 50201a6..49712c2 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -176,4 +176,21 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
bruteResult should contain allOf (0, 3)
bruteResult should contain noneOf (1, 2)
}
+
+ behavior of "pairwiseCoprimeNumbersUntil"
+
+ it should "return an empty set for malformed inputs" in {
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
+ }
+
+ it should "return all coprime numbers until the number given" in {
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
+ ShardingContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
+ }
}
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
deleted file mode 100644
index b3b37a7..0000000
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.loadBalancer.test
-
-import akka.actor.ActorSystem
-import akka.testkit.{ImplicitSender, TestKit}
-import akka.util.Timeout
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import org.junit.runner.RunWith
-import org.scalatest.{FlatSpecLike, _}
-import org.scalatest.junit.JUnitRunner
-import whisk.core.loadBalancer._
-
-import scala.concurrent.duration._
-
-// Define your test specific configuration here
-
-object TestKitConfig {
- val config = ConfigFactory.empty
- .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef("127.0.0.1"))
- .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef("2555"))
- .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("cluster"))
-}
-
-@RunWith(classOf[JUnitRunner])
-class SharedDataServiceTests()
- extends TestKit(ActorSystem("ControllerCluster", TestKitConfig.config))
- with ImplicitSender
- with FlatSpecLike
- with Matchers
- with BeforeAndAfterAll {
-
- override def afterAll {
- TestKit.shutdownActorSystem(system)
- }
-
- behavior of "SharedDataService"
-
- val sharedDataService = system.actorOf(SharedDataService.props("Candidates"), name = "busyMan")
- implicit val timeout = Timeout(5.seconds)
-
- it should "retrieve an empty map after initialization" in {
- sharedDataService ! GetMap
- val msg = Map()
- expectMsg(msg)
- }
- it should "increase the counter" in {
- sharedDataService ! IncreaseCounter("Donald", 1)
- sharedDataService ! GetMap
- val msg = Map("Donald" -> 1)
- expectMsg(msg)
- }
- it should "decrease the counter" in {
- sharedDataService ! IncreaseCounter("Donald", 2)
- sharedDataService ! DecreaseCounter("Donald", 2)
- sharedDataService ! GetMap
- val msg = Map("Donald" -> 1)
- expectMsg(msg)
- }
- it should "receive the map with all counters" in {
- sharedDataService ! IncreaseCounter("Hilary", 1)
- sharedDataService ! GetMap
- val msg = Map("Hilary" -> 1, "Donald" -> 1)
- expectMsg(msg)
- }
-}
--
To stop receiving notification emails like this one, please contact
cbickel@apache.org.