You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/26 18:39:48 UTC

[GitHub] tysonnorris closed pull request #2943: revert LoadBalancerData to be synchronous

tysonnorris closed pull request #2943: revert LoadBalancerData to be synchronous
URL: https://github.com/apache/incubator-openwhisk/pull/2943
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 6256af2a5b..0f066ea702 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -23,7 +23,7 @@ import whisk.core.entity.Identity
 import whisk.core.loadBalancer.LoadBalancer
 import whisk.http.Messages
 
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.ExecutionContext
 
 /**
  * Determines user limits and activation counts as seen by the invoker and the loadbalancer
@@ -31,7 +31,7 @@ import scala.concurrent.{ExecutionContext, Future}
  * to calculate and determine whether the namespace currently invoking a new action should
  * be allowed to do so.
  *
- * @param loadbalancer contains active quotas
+ * @param loadBalancer contains active quotas
  * @param defaultConcurrencyLimit the default max allowed concurrent operations
  * @param systemOverloadLimit the limit when the system is considered overloaded
  */
@@ -45,27 +45,25 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I
   /**
    * Checks whether the operation should be allowed to proceed.
    */
-  def check(user: Identity)(implicit tid: TransactionId): Future[RateLimit] = {
-    loadBalancer.activeActivationsFor(user.uuid).map { concurrentActivations =>
-      val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
-      logging.info(
-        this,
-        s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
-      ConcurrentRateLimit(concurrentActivations, concurrencyLimit)
-    }
+  def check(user: Identity)(implicit tid: TransactionId): RateLimit = {
+    val concurrentActivations = loadBalancer.activeActivationsFor(user.uuid)
+    val concurrencyLimit = user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
+    logging.info(
+      this,
+      s"namespace = ${user.uuid.asString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit")
+    ConcurrentRateLimit(concurrentActivations, concurrencyLimit)
+
   }
 
   /**
    * Checks whether the system is in a generally overloaded state.
    */
-  def isOverloaded()(implicit tid: TransactionId): Future[Boolean] = {
-    loadBalancer.totalActiveActivations.map { concurrentActivations =>
-      logging.info(
-        this,
-        s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
-      concurrentActivations > systemOverloadLimit
-    }
+  def isOverloaded()(implicit tid: TransactionId): Boolean = {
+    val concurrentActivations = loadBalancer.totalActiveActivations
+    logging.info(this, s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
+    concurrentActivations > systemOverloadLimit
   }
+
 }
 
 sealed trait RateLimit {
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 68d57c42dc..1d378d3e30 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -162,7 +162,7 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
 
     logging.info(this, s"checking user '${user.subject}' has not exceeded activation quota")
     checkSystemOverload(ACTIVATE)
-      .flatMap(_ => checkThrottleOverload(Future.successful(invokeRateThrottler.check(user))))
+      .flatMap(_ => checkThrottleOverload(invokeRateThrottler.check(user)))
       .flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user)))
   }
 
@@ -278,13 +278,11 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
    * @return future completing successfully if system is not overloaded else failing with a rejection
    */
   protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Future[Unit] = {
-    concurrentInvokeThrottler.isOverloaded.flatMap { isOverloaded =>
-      val systemOverload = right == ACTIVATE && isOverloaded
-      if (systemOverload) {
-        logging.error(this, "system is overloaded")
-        Future.failed(RejectRequest(TooManyRequests, systemOverloaded))
-      } else Future.successful(())
-    }
+    val systemOverload = right == ACTIVATE && concurrentInvokeThrottler.isOverloaded
+    if (systemOverload) {
+      logging.error(this, "system is overloaded")
+      Future.failed(RejectRequest(TooManyRequests, systemOverloaded))
+    } else Future.successful(())
   }
 
   /**
@@ -302,9 +300,9 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
     implicit transid: TransactionId): Future[Unit] = {
     if (right == ACTIVATE) {
       if (resources.exists(_.collection.path == Collection.ACTIONS)) {
-        checkThrottleOverload(Future.successful(invokeRateThrottler.check(user)))
+        checkThrottleOverload(invokeRateThrottler.check(user))
       } else if (resources.exists(_.collection.path == Collection.TRIGGERS)) {
-        checkThrottleOverload(Future.successful(triggerRateThrottler.check(user)))
+        checkThrottleOverload(triggerRateThrottler.check(user))
       } else Future.successful(())
     } else Future.successful(())
   }
@@ -327,13 +325,11 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala
     } else Future.successful(())
   }
 
-  private def checkThrottleOverload(throttle: Future[RateLimit])(implicit transid: TransactionId): Future[Unit] = {
-    throttle.flatMap { limit =>
-      if (limit.ok) {
-        Future.successful(())
-      } else {
-        Future.failed(RejectRequest(TooManyRequests, limit.errorMsg))
-      }
+  private def checkThrottleOverload(throttle: RateLimit)(implicit transid: TransactionId): Future[Unit] = {
+    if (throttle.ok) {
+      Future.successful(())
+    } else {
+      Future.failed(RejectRequest(TooManyRequests, throttle.errorMsg))
     }
   }
 }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
index 34b5d6708f..c9202951c9 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/DistributedLoadBalancerData.scala
@@ -17,15 +17,17 @@
 
 package whisk.core.loadBalancer
 
+import akka.actor.Actor
+import akka.actor.ActorRef
 import akka.actor.ActorSystem
+import akka.actor.Props
 import akka.util.Timeout
-import akka.pattern.ask
-import whisk.common.Logging
-import whisk.core.entity.{ActivationId, UUID}
-
+import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
 import scala.concurrent.duration._
+import whisk.common.Logging
+import whisk.core.entity.ActivationId
+import whisk.core.entity.UUID
 
 /**
  * Encapsulates data used for loadbalancer and active-ack bookkeeping.
@@ -33,41 +35,47 @@ import scala.concurrent.duration._
  * Note: The state keeping is backed by distributed akka actors. All CRUDs operations are done on local values, thus
  * a stale value might be read.
  */
-class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Logging) extends LoadBalancerData {
+class DistributedLoadBalancerData(monitor: Option[ActorRef] = None)(implicit actorSystem: ActorSystem, logging: Logging)
+    extends LocalLoadBalancerData {
 
   implicit val timeout = Timeout(5.seconds)
   implicit val executionContext = actorSystem.dispatcher
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
+
+  private val updateMonitor = actorSystem.actorOf(Props(new Actor {
+    override def receive = {
+      case Updated(storageName, entries) =>
+        monitor.foreach(_ ! Updated(storageName, entries))
+        storageName match {
+          case "Invokers" =>
+            //reset the state with updates:
+            val builder = TrieMap.newBuilder[String, AtomicInteger]
+            entries.map(e => {
+              builder += ((e._1, new AtomicInteger(e._2)))
+            })
+            activationByInvoker = builder.result()
+          case "Namespaces" => //sharedDataNamespaces = entries
+            //reset the state with updates:
+            val builder = TrieMap.newBuilder[UUID, AtomicInteger]
+            entries.map(e => {
+              builder += ((UUID(e._1), new AtomicInteger(e._2)))
+            })
+            activationByNamespaceId = builder.result()
+        }
+    }
+  }))
 
   private val sharedStateInvokers = actorSystem.actorOf(
-    SharedDataService.props("Invokers"),
+    SharedDataService.props("Invokers", updateMonitor),
     name =
       "SharedDataServiceInvokers" + UUID())
   private val sharedStateNamespaces = actorSystem.actorOf(
-    SharedDataService.props("Namespaces"),
+    SharedDataService.props("Namespaces", updateMonitor),
     name =
       "SharedDataServiceNamespaces" + UUID())
 
-  def totalActivationCount =
-    (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.values.sum.toInt)
-
-  def activationCountOn(namespace: UUID): Future[Int] = {
-    (sharedStateNamespaces ? GetMap)
-      .mapTo[Map[String, BigInt]]
-      .map(_.mapValues(_.toInt).getOrElse(namespace.toString, 0))
-  }
-
-  def activationCountPerInvoker: Future[Map[String, Int]] = {
-    (sharedStateInvokers ? GetMap).mapTo[Map[String, BigInt]].map(_.mapValues(_.toInt))
-  }
-
-  def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(activationId)
-  }
-
-  def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
+  override def putActivation(id: ActivationId, update: => ActivationEntry): ActivationEntry = {
     activationsById.getOrElseUpdate(id, {
-      val entry = update
+      val entry = super.putActivation(id, update)
       sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 1)
       sharedStateInvokers ! IncreaseCounter(entry.invokerName.toString, 1)
       logging.debug(this, "increased shared counters")
@@ -75,16 +83,13 @@ class DistributedLoadBalancerData(implicit actorSystem: ActorSystem, logging: Lo
     })
   }
 
-  def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
-    activationsById.remove(entry.id).map { activationEntry =>
-      sharedStateInvokers ! DecreaseCounter(entry.invokerName.toString, 1)
+  override def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
+    super.removeActivation(entry).map { activationEntry =>
       sharedStateNamespaces ! DecreaseCounter(entry.namespaceId.asString, 1)
-      logging.debug(this, "decreased shared counters")
+      sharedStateInvokers ! DecreaseCounter(entry.invokerName.toString, 1)
+      logging.debug(this, s"decreased shared counters ")
       activationEntry
     }
   }
 
-  def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(aid).flatMap(removeActivation)
-  }
 }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
index 0018cbbd8b..a528d94acb 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
@@ -20,7 +20,7 @@ package whisk.core.loadBalancer
 import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation}
 
 import akka.actor.Cancellable
-import scala.concurrent.{Future, Promise}
+import scala.concurrent.Promise
 
 // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
 case class ActivationEntry(id: ActivationId,
@@ -31,7 +31,7 @@ case class ActivationEntry(id: ActivationId,
 trait LoadBalancerData {
 
   /** Get the number of activations across all namespaces. */
-  def totalActivationCount: Future[Int]
+  def totalActivationCount: Int
 
   /**
    * Get the number of activations for a specific namespace.
@@ -39,14 +39,14 @@ trait LoadBalancerData {
    * @param namespace The namespace to get the activation count for
    * @return a map (namespace -> number of activations in the system)
    */
-  def activationCountOn(namespace: UUID): Future[Int]
+  def activationCountOn(namespace: UUID): Int
 
   /**
    * Get the number of activations for each invoker.
    *
    * @return a map (invoker -> number of activations queued for the invoker)
    */
-  def activationCountPerInvoker: Future[Map[String, Int]]
+  def activationCountPerInvoker: Map[String, Int]
 
   /**
    * Get an activation entry for a given activation id.
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 29a169fe7a..50764286d3 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -63,10 +63,10 @@ trait LoadBalancer {
   val activeAckTimeoutGrace = 1.minute
 
   /** Gets the number of in-flight activations for a specific user. */
-  def activeActivationsFor(namespace: UUID): Future[Int]
+  def activeActivationsFor(namespace: UUID): Int
 
   /** Gets the number of in-flight activations in the system. */
-  def totalActiveActivations: Future[Int]
+  def totalActiveActivations: Int
 
   /**
    * Publishes activation message on internal bus for an invoker to pick up.
@@ -327,20 +327,19 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
   private def chooseInvoker(user: Identity, action: ExecutableWhiskActionMetaData): Future[InstanceId] = {
     val hash = generateHash(user.namespace, action)
 
-    loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
-      allInvokers.flatMap { invokers =>
-        val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
-        val invokersWithUsage = invokersToUse.view.map {
-          // Using a view defers the comparably expensive lookup to actual access of the element
-          case (instance, state) => (instance, state, currentActivations.getOrElse(instance.toString, 0))
-        }
+    allInvokers.flatMap { invokers =>
+      val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers) else managedInvokers(invokers)
+      val invokerUsage = loadBalancerData.activationCountPerInvoker
+      val invokersWithUsage = invokersToUse.view.map {
+        // Using a view defers the comparably expensive lookup to actual access of the element
+        case (instance, state) => (instance, state, invokerUsage.getOrElse(instance.toString, 0))
+      }
 
-        LoadBalancerService.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
-          case Some(invoker) => Future.successful(invoker)
-          case None =>
-            logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
-            Future.failed(new LoadBalancerException("no invokers available"))
-        }
+      LoadBalancerService.schedule(invokersWithUsage, lbConfig.invokerBusyThreshold, hash) match {
+        case Some(invoker) => Future.successful(invoker)
+        case None =>
+          logging.error(this, s"all invokers down")(TransactionId.invokerHealth)
+          Future.failed(new LoadBalancerException("no invokers available"))
       }
     }
   }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
index 92e3789e76..6c190f6c1e 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LocalLoadBalancerData.scala
@@ -20,7 +20,6 @@ package whisk.core.loadBalancer
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.concurrent.TrieMap
-import scala.concurrent.Future
 import whisk.core.entity.{ActivationId, UUID}
 
 /**
@@ -32,19 +31,19 @@ import whisk.core.entity.{ActivationId, UUID}
  */
 class LocalLoadBalancerData() extends LoadBalancerData {
 
-  private val activationByInvoker = TrieMap[String, AtomicInteger]()
-  private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
+  protected var activationByInvoker = TrieMap[String, AtomicInteger]()
+  protected var activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
+  protected var activationsById = TrieMap[ActivationId, ActivationEntry]()
   private val totalActivations = new AtomicInteger(0)
 
-  override def totalActivationCount: Future[Int] = Future.successful(totalActivations.get)
+  override def totalActivationCount: Int = totalActivations.get
 
-  override def activationCountOn(namespace: UUID): Future[Int] = {
-    Future.successful(activationByNamespaceId.get(namespace).map(_.get).getOrElse(0))
+  override def activationCountOn(namespace: UUID): Int = {
+    activationByNamespaceId.get(namespace).map(_.get).getOrElse(0)
   }
 
-  override def activationCountPerInvoker: Future[Map[String, Int]] = {
-    Future.successful(activationByInvoker.toMap.mapValues(_.get))
+  override def activationCountPerInvoker: Map[String, Int] = {
+    activationByInvoker.toMap.mapValues(_.get)
   }
 
   override def activationById(activationId: ActivationId): Option[ActivationEntry] = {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
index d0595d3ece..4ba7d39791 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/SharedDataService.scala
@@ -28,17 +28,19 @@ case class IncreaseCounter(key: String, value: Long)
 case class DecreaseCounter(key: String, value: Long)
 case class ReadCounter(key: String)
 case class RemoveCounter(key: String)
+case class Updated(storageName: String, entries: Map[String, Int])
+
 case object GetMap
 
 /**
  * Companion object to specify actor properties from the outside, e.g. name of the shared map and cluster seed nodes
  */
 object SharedDataService {
-  def props(storageName: String): Props =
-    Props(new SharedDataService(storageName))
+  def props(storageName: String, monitor: ActorRef): Props =
+    Props(new SharedDataService(storageName, monitor))
 }
 
-class SharedDataService(storageName: String) extends Actor with ActorLogging {
+class SharedDataService(storageName: String, monitor: ActorRef) extends Actor with ActorLogging {
 
   val replicator = DistributedData(context.system).replicator
 
@@ -81,7 +83,13 @@ class SharedDataService(storageName: String) extends Actor with ActorLogging {
 
     case c @ Changed(_) =>
       logging.debug(this, "Current elements: " + c.get(storage))
-
+      val res = c.get(storage).entries.mapValues(_.toInt)
+      if (res.nonEmpty) {
+        res.values.foreach(i => {
+          require(i >= 0, s"values cannot be less than 0 ${res}")
+        })
+        monitor ! Updated(storageName, res)
+      }
     case g @ GetSuccess(_, Some((replyTo: ActorRef))) =>
       val map = g.get(storage).entries
       replyTo ! map
diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index e7af616a91..cba829ba6c 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -181,8 +181,8 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC
   // unit tests that need an activation via active ack/fast path should set this to value expected
   var whiskActivationStub: Option[(FiniteDuration, WhiskActivation)] = None
 
-  override def totalActiveActivations = Future.successful(0)
-  override def activeActivationsFor(namespace: UUID) = Future.successful(0)
+  override def totalActiveActivations = 0
+  override def activeActivationsFor(namespace: UUID) = 0
 
   override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
     implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] =
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
index 9afa67f575..6a0bb5d3d0 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala
@@ -21,16 +21,16 @@ import akka.actor.ActorSystem
 import akka.actor.Cancellable
 import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
 import common.StreamLogging
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.{FlatSpec, Matchers}
 import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
 import whisk.core.loadBalancer.{ActivationEntry, DistributedLoadBalancerData, LocalLoadBalancerData}
-
 import scala.concurrent.{Await, Future, Promise}
 import whisk.core.entity.InstanceId
-
 import scala.concurrent.duration._
 
-class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
+class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging with Eventually {
   final val emptyCancellable: Cancellable = new Cancellable {
     def isCancelled = false
     def cancel() = true
@@ -53,11 +53,47 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
   val actorSystemName = "controller-actor-system"
 
   implicit val actorSystem = ActorSystem(actorSystemName, config)
+  implicit override val patienceConfig = PatienceConfig(timeout = 500.milliseconds)
 
   def await[A](f: Future[A], timeout: FiniteDuration = 1.second) = Await.result(f, timeout)
 
   behavior of "LoadBalancerData"
 
+  it should "quickly execute many activations" in {
+    implicit val executionContext = actorSystem.dispatcher
+    val distributedLoadBalancerData = new DistributedLoadBalancerData()
+    val localLoadBalancerData = new LocalLoadBalancerData()
+    val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
+    val activationCount = 100000
+    loadBalancerDataArray.foreach { lbd =>
+      val start = System.currentTimeMillis()
+      (1 to activationCount).foreach(_ => {
+        Future {
+          //get the totalActivationCount
+          lbd.totalActivationCount
+          //get the activation count per invoker
+          val counts = lbd.activationCountPerInvoker
+          //get the activation count for this ns
+          val nscounts = lbd.activationCountOn(firstEntry.namespaceId)
+
+          //add the activation entry
+          val id = ActivationId()
+          lbd.putActivation(id, firstEntry)
+
+          //remove the activation entry
+          lbd.removeActivation(id)
+        }
+      })
+      eventually(Timeout(2.seconds)) {
+        lbd.totalActivationCount shouldBe activationCount
+        lbd.activationCountOn(firstEntry.namespaceId) shouldBe activationCount
+        lbd.activationCountPerInvoker shouldBe Map(firstEntry.invokerName.toString -> activationCount)
+      }
+      println(s"completed for ${lbd.getClass} in ${System.currentTimeMillis() - start} ms")
+
+    }
+  }
+
   it should "return the number of activations for a namespace" in {
     val distributedLoadBalancerData = new DistributedLoadBalancerData()
     val localLoadBalancerData = new LocalLoadBalancerData()
@@ -65,8 +101,8 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
     val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
-      await(lbd.activationCountPerInvoker) shouldBe Map(firstEntry.invokerName.toString -> 1)
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
+      lbd.activationCountPerInvoker shouldBe Map(firstEntry.invokerName.toString -> 1)
       lbd.activationById(firstEntry.id) shouldBe Some(firstEntry)
 
       // clean up after yourself
@@ -83,8 +119,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
       lbd.putActivation(secondEntry.id, secondEntry)
-
-      val res = await(lbd.activationCountPerInvoker)
+      val res = lbd.activationCountPerInvoker
 
       res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
       res.get(secondEntry.invokerName.toString()) shouldBe Some(1)
@@ -107,17 +142,17 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
     val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
-      val res = await(lbd.activationCountPerInvoker)
+      val res = lbd.activationCountPerInvoker
       res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
 
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
       lbd.removeActivation(firstEntry)
 
-      val resAfterRemoval = await(lbd.activationCountPerInvoker)
+      val resAfterRemoval = lbd.activationCountPerInvoker
       resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
 
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 0
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 0
       lbd.activationById(firstEntry.id) shouldBe None
     }
 
@@ -132,12 +167,12 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
 
-      val res = await(lbd.activationCountPerInvoker)
+      val res = lbd.activationCountPerInvoker
       res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
 
       lbd.removeActivation(firstEntry.id)
 
-      val resAfterRemoval = await(lbd.activationCountPerInvoker)
+      val resAfterRemoval = lbd.activationCountPerInvoker
       resAfterRemoval.get(firstEntry.invokerName.toString()) shouldBe Some(0)
     }
 
@@ -167,29 +202,29 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(entry.id, entry)
 
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      var res = await(lbd.activationCountPerInvoker)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 1
+      var res = lbd.activationCountPerInvoker
       res.get(entry.invokerName.toString()) shouldBe Some(1)
 
       lbd.putActivation(entrySameInvokerAndNamespace.id, entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
-      res = await(lbd.activationCountPerInvoker)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 2
+      res = lbd.activationCountPerInvoker
       res.get(entry.invokerName.toString()) shouldBe Some(2)
 
       lbd.putActivation(entrySameInvoker.id, entrySameInvoker)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 2
-      res = await(lbd.activationCountPerInvoker)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 2
+      res = lbd.activationCountPerInvoker
       res.get(entry.invokerName.toString()) shouldBe Some(3)
 
       lbd.removeActivation(entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      res = await(lbd.activationCountPerInvoker)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 1
+      res = lbd.activationCountPerInvoker
       res.get(entry.invokerName.toString()) shouldBe Some(2)
 
       // removing non existing entry doesn't mess up
       lbd.removeActivation(entrySameInvokerAndNamespace)
-      await(lbd.activationCountOn(entry.namespaceId)) shouldBe 1
-      res = await(lbd.activationCountPerInvoker)
+      lbd.activationCountOn(entry.namespaceId) shouldBe 1
+      res = lbd.activationCountPerInvoker
       res.get(entry.invokerName.toString()) shouldBe Some(2)
 
       // clean up
@@ -207,14 +242,14 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
     val loadBalancerDataArray = Array(localLoadBalancerData, distributedLoadBalancerData)
     loadBalancerDataArray.map { lbd =>
       lbd.putActivation(firstEntry.id, firstEntry)
-      val res = await(lbd.activationCountPerInvoker)
+      val res = lbd.activationCountPerInvoker
       res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
       lbd.putActivation(firstEntry.id, firstEntry)
-      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
+      val resAfterAddingTheSameEntry = lbd.activationCountPerInvoker
       resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
       lbd.removeActivation(firstEntry)
       lbd.removeActivation(firstEntry)
@@ -270,9 +305,9 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
 
       called shouldBe 1
 
-      val res = await(lbd.activationCountPerInvoker)
+      val res = lbd.activationCountPerInvoker
       res.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
 
       // entry already exists, should not evaluate the block and change the state
       val entryAfterSecond = lbd.putActivation(entrySameId.id, {
@@ -282,9 +317,9 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging {
 
       called shouldBe 1
       entry shouldBe entryAfterSecond
-      val resAfterAddingTheSameEntry = await(lbd.activationCountPerInvoker)
+      val resAfterAddingTheSameEntry = lbd.activationCountPerInvoker
       resAfterAddingTheSameEntry.get(firstEntry.invokerName.toString()) shouldBe Some(1)
-      await(lbd.activationCountOn(firstEntry.namespaceId)) shouldBe 1
+      lbd.activationCountOn(firstEntry.namespaceId) shouldBe 1
     }
 
   }
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
index 3961e539ca..cc757371f2 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/SharedDataServiceTests.scala
@@ -25,8 +25,8 @@ import com.typesafe.config.ConfigFactory
 import org.scalatest._
 import whisk.core.loadBalancer._
 import org.scalatest.FlatSpecLike
-
 import scala.concurrent.duration._
+import whisk.core.entity.InstanceId
 
 // Define your test specific configuration here
 
@@ -61,7 +61,8 @@ class SharedDataServiceTests()
     .withFallback(ConfigFactory.load())
 
   val s = ActorSystem("controller-actor-system", config)
-  val sharedDataService = s.actorOf(SharedDataService.props("Candidates"), name = "busyMan")
+  val sharedDataService = s.actorOf(SharedDataService.props("Candidates", testActor), name = "busyMan")
+  val controllerInstance = InstanceId(0)
   implicit val timeout = Timeout(5.seconds)
 
   it should "retrieve an empty map after initialization" in {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services