You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/10/09 08:21:04 UTC

[GitHub] cbickel closed pull request #4011: Customize invoker use memory for memory based loadbalancing

cbickel closed pull request #4011: Customize invoker use memory for memory based loadbalancing
URL: https://github.com/apache/incubator-openwhisk/pull/4011
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index de2d71f704..a4cc8369b5 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -214,8 +214,6 @@
         "{{ controller.ssl.storeFlavor }}"
       "CONFIG_whisk_controller_https_clientAuth":
         "{{ controller.ssl.clientAuth }}"
-      "CONFIG_whisk_loadbalancer_invokerUserMemory":
-        "{{ invoker.userMemory }}"
       "CONFIG_whisk_loadbalancer_blackboxFraction":
         "{{ controller.blackboxFraction }}"
       "CONFIG_whisk_loadbalancer_timeoutFactor":
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 145febf2a8..6712c38e60 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -217,7 +217,7 @@
       "CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}"
       "CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}"
       "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
-      "CONFIG_whisk_containerPool_userMemory": "{{ invoker.userMemory }}"
+      "CONFIG_whisk_containerPool_userMemory": "{{ hostvars[groups['invokers'][invoker_index | int]].user_memory | default(invoker.userMemory) }}"
       "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}"
       "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}"
       "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
index 5122980e06..0d477b44f4 100644
--- a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
@@ -30,7 +30,8 @@ import whisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
  */
 case class InvokerInstanceId(val instance: Int,
                              uniqueName: Option[String] = None,
-                             displayedName: Option[String] = None) {
+                             displayedName: Option[String] = None,
+                             val userMemory: ByteSize) {
   def toInt: Int = instance
 
   override def toString: String = (Seq("invoker" + instance) ++ uniqueName ++ displayedName).mkString("/")
@@ -43,7 +44,8 @@ case class ControllerInstanceId(val asString: String) {
 }
 
 object InvokerInstanceId extends DefaultJsonProtocol {
-  implicit val serdes = jsonFormat3(InvokerInstanceId.apply)
+  import whisk.core.entity.size.{serdes => xserds}
+  implicit val serdes = jsonFormat4(InvokerInstanceId.apply)
 }
 
 object ControllerInstanceId extends DefaultJsonProtocol {
diff --git a/common/scala/src/main/scala/whisk/core/entity/Size.scala b/common/scala/src/main/scala/whisk/core/entity/Size.scala
index 34f5bc6ef0..9af4128350 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Size.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Size.scala
@@ -109,7 +109,7 @@ object ByteSize {
   def fromString(sizeString: String): ByteSize = {
     val matcher = regex.matcher(sizeString)
     if (matcher.matches()) {
-      val size = matcher.group(1).toInt
+      val size = matcher.group(1).toLong
       val unit = matcher.group(2).charAt(0).toUpper match {
         case 'B' => SizeUnits.BYTE
         case 'K' => SizeUnits.KB
diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf
index a88ea6fae3..3f76e0c995 100644
--- a/core/controller/src/main/resources/reference.conf
+++ b/core/controller/src/main/resources/reference.conf
@@ -6,7 +6,6 @@ whisk {
     use-cluster-bootstrap: false
   }
   loadbalancer {
-    invoker-user-memory: 1024 m
     blackbox-fraction: 10%
     # factor to increase the timeout for forced active acks
     # timeout = time-limit.std * timeoutfactor + 1m
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 7e76f016ac..305386b7fb 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -194,7 +194,10 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
 
     // Grow the underlying status sequence to the size needed to contain the incoming ping. Dummy values are created
     // to represent invokers, where ping messages haven't arrived yet
-    status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InvokerInstanceId(i), Offline))
+    status = padToIndexed(
+      status,
+      instanceId.toInt + 1,
+      i => new InvokerHealth(InvokerInstanceId(i, userMemory = instanceId.userMemory), Offline))
     status = status.updated(instanceId.toInt, new InvokerHealth(instanceId, Offline))
 
     val ref = childFactory(context, instanceId)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 4d6bbf8dd2..66f66a2fe1 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -525,9 +525,6 @@ case class ShardingContainerPoolBalancerState(
   lbConfig: ShardingContainerPoolBalancerConfig =
     loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(implicit logging: Logging) {
 
-  private val totalInvokerThreshold = lbConfig.invokerUserMemory
-  private var currentInvokerThreshold = totalInvokerThreshold
-
   private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
   logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
 
@@ -540,6 +537,23 @@ case class ShardingContainerPoolBalancerState(
   def invokerSlots: IndexedSeq[ForcibleSemaphore] = _invokerSlots
   def clusterSize: Int = _clusterSize
 
+  /**
+   * @param memory
+   * @return calculated invoker slot
+   */
+  private def getInvokerSlot(memory: ByteSize): ByteSize = {
+    val newTreshold = if (memory / _clusterSize < MemoryLimit.minMemory) {
+      logging.warn(
+        this,
+        s"registered controllers: ${_clusterSize}: the slots per invoker fall below the min memory of one action.")(
+        TransactionId.loadbalancer)
+      MemoryLimit.minMemory
+    } else {
+      memory / _clusterSize
+    }
+    newTreshold
+  }
+
   /**
    * Updates the scheduling state with the new invokers.
    *
@@ -570,8 +584,8 @@ case class ShardingContainerPoolBalancerState(
 
       if (oldSize < newSize) {
         // Keeps the existing state..
-        _invokerSlots = _invokerSlots ++ IndexedSeq.fill(newSize - oldSize) {
-          new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt)
+        _invokerSlots = _invokerSlots ++ _invokers.drop(_invokerSlots.length).map { invoker =>
+          new ForcibleSemaphore(getInvokerSlot(invoker.id.userMemory).toMB.toInt)
         }
       }
     }
@@ -594,22 +608,10 @@ case class ShardingContainerPoolBalancerState(
     val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls back to a size of 1 (alone)
     if (_clusterSize != actualSize) {
       _clusterSize = actualSize
-      val newTreshold = if (totalInvokerThreshold / actualSize < MemoryLimit.minMemory) {
-        logging.warn(
-          this,
-          s"registered controllers: ${_clusterSize}: the slots per invoker fall below the min memory of one action.")(
-          TransactionId.loadbalancer)
-        MemoryLimit.minMemory // letting this fall below minMemory doesn't make sense
-      } else {
-        totalInvokerThreshold / actualSize
+      _invokerSlots = _invokers.map { invoker =>
+        new ForcibleSemaphore(getInvokerSlot(invoker.id.userMemory).toMB.toInt)
       }
-      currentInvokerThreshold = newTreshold
-      _invokerSlots = _invokerSlots.map(_ => new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt))
-
-      logging.info(
-        this,
-        s"loadbalancer cluster size changed to $actualSize active nodes. invokerThreshold = $currentInvokerThreshold")(
-        TransactionId.loadbalancer)
+      logging.info(this, s"loadbalancer cluster size changed to $actualSize active nodes.")(TransactionId.loadbalancer)
     }
   }
 }
@@ -625,12 +627,9 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
  * Configuration for the sharding container pool balancer.
  *
  * @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes
- * @param invokerUserMemory how many Bytes of memory an invoker has available in total for user containers
  * @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + 1m)
  */
-case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double,
-                                               invokerUserMemory: ByteSize,
-                                               timeoutFactor: Int)
+case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeoutFactor: Int)
 
 /**
  * State kept for each activation until completion.
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 8cec373087..95eea6eb62 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -25,11 +25,13 @@ import kamon.Kamon
 import pureconfig.loadConfigOrThrow
 import whisk.common.Https.HttpsConfig
 import whisk.common._
-import whisk.core.WhiskConfig
+import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.WhiskConfig._
 import whisk.core.connector.{MessagingProvider, PingMessage}
+import whisk.core.containerpool.ContainerPoolConfig
 import whisk.core.entity.{ExecManifest, InvokerInstanceId}
 import whisk.core.entity.ActivationEntityLimit
+import whisk.core.entity.size._
 import whisk.http.{BasicHttpService, BasicRasService}
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
@@ -68,6 +70,7 @@ object Invoker {
     implicit val actorSystem: ActorSystem =
       ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
     implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+    val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
 
     // Prepare Kamon shutdown
     CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
@@ -143,8 +146,11 @@ object Invoker {
 
     val topicBaseName = "invoker"
     val topicName = topicBaseName + assignedInvokerId
+
     val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)
-    val invokerInstance = InvokerInstanceId(assignedInvokerId, cmdLineArgs.uniqueName, cmdLineArgs.displayedName)
+    val invokerInstance =
+      InvokerInstanceId(assignedInvokerId, cmdLineArgs.uniqueName, cmdLineArgs.displayedName, poolConfig.userMemory)
+
     val msgProvider = SpiLoader.get[MessagingProvider]
     if (msgProvider
           .ensureTopic(config, topic = topicName, topicConfig = topicBaseName, maxMessageBytes = maxMessageBytes)
@@ -153,7 +159,7 @@ object Invoker {
     }
     val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
     val invoker = try {
-      new InvokerReactive(config, invokerInstance, producer)
+      new InvokerReactive(config, invokerInstance, producer, poolConfig)
     } catch {
       case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
     }
diff --git a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
index ff4419d453..ed1ac37199 100644
--- a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
+++ b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
@@ -41,6 +41,7 @@ class CompletionMessageTests extends FlatSpec with Matchers {
 
   behavior of "completion message"
 
+  val defaultUserMemory: ByteSize = 1024.MB
   val activation = WhiskActivation(
     namespace = EntityPath("ns"),
     name = EntityName("a"),
@@ -53,7 +54,10 @@ class CompletionMessageTests extends FlatSpec with Matchers {
     duration = Some(123))
 
   it should "serialize a left completion message" in {
-    val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InvokerInstanceId(0))
+    val m = CompletionMessage(
+      TransactionId.testing,
+      Left(ActivationId.generate()),
+      InvokerInstanceId(0, userMemory = defaultUserMemory))
     m.serialize shouldBe JsObject(
       "transid" -> m.transid.toJson,
       "response" -> m.response.left.get.toJson,
@@ -61,7 +65,8 @@ class CompletionMessageTests extends FlatSpec with Matchers {
   }
 
   it should "serialize a right completion message" in {
-    val m = CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0))
+    val m =
+      CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0, userMemory = defaultUserMemory))
     m.serialize shouldBe JsObject(
       "transid" -> m.transid.toJson,
       "response" -> m.response.right.get.toJson,
@@ -69,12 +74,16 @@ class CompletionMessageTests extends FlatSpec with Matchers {
   }
 
   it should "deserialize a left completion message" in {
-    val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InvokerInstanceId(0))
+    val m = CompletionMessage(
+      TransactionId.testing,
+      Left(ActivationId.generate()),
+      InvokerInstanceId(0, userMemory = defaultUserMemory))
     CompletionMessage.parse(m.serialize) shouldBe Success(m)
   }
 
   it should "deserialize a right completion message" in {
-    val m = CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0))
+    val m =
+      CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0, userMemory = defaultUserMemory))
     CompletionMessage.parse(m.serialize) shouldBe Success(m)
   }
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
index 306b94605c..f32d01801b 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
@@ -38,8 +38,7 @@ import whisk.core.containerpool.docker.DockerApiWithFileAccess
 import whisk.core.containerpool.docker.DockerContainerFactory
 import whisk.core.containerpool.docker.DockerContainerFactoryConfig
 import whisk.core.containerpool.docker.RuncApi
-import whisk.core.entity.ExecManifest
-import whisk.core.entity.InvokerInstanceId
+import whisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId}
 import whisk.core.entity.size._
 
 @RunWith(classOf[JUnitRunner])
@@ -57,6 +56,8 @@ class DockerContainerFactoryTests
 
   behavior of "DockerContainerFactory"
 
+  val defaultUserMemory: ByteSize = 1024.MB
+
   it should "set the docker run args based on ContainerArgsConfig" in {
 
     val image = ExecManifest.runtimesManifest.manifests("nodejs").image
@@ -104,7 +105,7 @@ class DockerContainerFactoryTests
 
     val factory =
       new DockerContainerFactory(
-        InvokerInstanceId(0),
+        InvokerInstanceId(0, userMemory = defaultUserMemory),
         Map.empty,
         ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("env" -> Set("e1", "e2"))),
         DockerContainerFactoryConfig(true))(actorSystem, executionContext, logging, dockerApiStub, mock[RuncApi])
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 904daebd05..920d5358b6 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -56,6 +56,7 @@ class ContainerProxyTests
 
   val timeout = 5.seconds
   val log = logging
+  val defaultUserMemory: ByteSize = 1024.MB
 
   // Common entities to pass to the tests. We don't really care what's inside
   // those for the behavior testing here, as none of the contents will really
@@ -192,7 +193,7 @@ class ContainerProxyTests
             createAcker(),
             store,
             createCollector(),
-            InvokerInstanceId(0, Some("myname")),
+            InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory),
             poolConfig,
             pauseGrace = timeout))
     registerCallback(machine)
@@ -216,7 +217,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
 
     preWarm(machine)
@@ -253,7 +261,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -301,7 +316,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -340,7 +362,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
 
@@ -373,7 +402,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
 
     machine ! Run(noLogsAction, message)
@@ -413,7 +449,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -471,7 +514,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -507,7 +557,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -548,7 +605,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -579,7 +643,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -609,7 +680,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -643,7 +721,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, createCollector(), InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            createCollector(),
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized) // first run an activation
     timeout(machine) // times out Ready state so container suspends
@@ -679,7 +764,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, createCollector(), InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            createCollector(),
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine) // times out Ready state so container suspends
@@ -716,7 +808,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
 
     // Start running the action
@@ -768,7 +867,14 @@ class ContainerProxyTests
     val machine =
       childActorOf(
         ContainerProxy
-          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine)
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 9cff1df96e..587e3d4431 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -49,6 +49,7 @@ import whisk.core.connector.ActivationMessage
 import whisk.core.connector.PingMessage
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity._
+import whisk.core.entity.size._
 import whisk.core.loadBalancer.ActivationRequest
 import whisk.core.loadBalancer.GetStatus
 import whisk.core.loadBalancer.InvokerState._
@@ -74,6 +75,7 @@ class InvokerSupervisionTests
     with StreamLogging {
 
   val config = new WhiskConfig(ExecManifest.requiredProperties)
+  val defaultUserMemory: ByteSize = 1024.MB
 
   ExecManifest.initialize(config)
 
@@ -92,7 +94,7 @@ class InvokerSupervisionTests
 
   /** Helper to generate a list of (InstanceId, InvokerState) */
   def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map {
-    case (state, index) => new InvokerHealth(InvokerInstanceId(index), state)
+    case (state, index) => new InvokerHealth(InvokerInstanceId(index, userMemory = defaultUserMemory), state)
   }
 
   val pC = new TestConnector("pingFeedTtest", 4, false) {}
@@ -103,8 +105,8 @@ class InvokerSupervisionTests
     val invoker5 = TestProbe()
     val invoker2 = TestProbe()
 
-    val invoker5Instance = InvokerInstanceId(5)
-    val invoker2Instance = InvokerInstanceId(2)
+    val invoker5Instance = InvokerInstanceId(5, userMemory = defaultUserMemory)
+    val invoker2Instance = InvokerInstanceId(2, userMemory = defaultUserMemory)
 
     val children = mutable.Queue(invoker5.ref, invoker2.ref)
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
@@ -145,7 +147,7 @@ class InvokerSupervisionTests
 
   it should "forward the ActivationResult to the appropriate invoker" in {
     val invoker = TestProbe()
-    val invokerInstance = InvokerInstanceId(0)
+    val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
     val invokerName = s"invoker${invokerInstance.toInt}"
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
     val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
@@ -170,7 +172,7 @@ class InvokerSupervisionTests
 
   it should "forward an ActivationMessage to the sendActivation-Method" in {
     val invoker = TestProbe()
-    val invokerInstance = InvokerInstanceId(0)
+    val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
     val invokerName = s"invoker${invokerInstance.toInt}"
     val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
 
@@ -209,7 +211,9 @@ class InvokerSupervisionTests
   // offline -> unhealthy
   it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
     val pool = TestProbe()
-    val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+    val invoker =
+      pool.system.actorOf(
+        InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
 
     within(timeout.duration) {
       pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -217,7 +221,7 @@ class InvokerSupervisionTests
       timeout(invoker)
       pool.expectMsg(Transition(invoker, Unhealthy, Offline))
 
-      invoker ! PingMessage(InvokerInstanceId(0))
+      invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory))
       pool.expectMsg(Transition(invoker, Offline, Unhealthy))
     }
   }
@@ -225,26 +229,34 @@ class InvokerSupervisionTests
   // unhealthy -> healthy -> unhealthy -> healthy
   it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in {
     val pool = TestProbe()
-    val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+    val invoker =
+      pool.system.actorOf(
+        InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
 
     within(timeout.duration) {
       pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
       pool.expectMsg(CurrentState(invoker, Unhealthy))
 
       (1 to InvokerActor.bufferSize).foreach { _ =>
-        invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+        invoker ! InvocationFinishedMessage(
+          InvokerInstanceId(0, userMemory = defaultUserMemory),
+          InvocationFinishedResult.Success)
       }
       pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
 
       // Fill buffer with errors
       (1 to InvokerActor.bufferSize).foreach { _ =>
-        invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.SystemError)
+        invoker ! InvocationFinishedMessage(
+          InvokerInstanceId(0, userMemory = defaultUserMemory),
+          InvocationFinishedResult.SystemError)
       }
       pool.expectMsg(Transition(invoker, Healthy, Unhealthy))
 
       // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
       (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
-        invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+        invoker ! InvocationFinishedMessage(
+          InvokerInstanceId(0, userMemory = defaultUserMemory),
+          InvocationFinishedResult.Success)
       }
       pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
     }
@@ -253,26 +265,34 @@ class InvokerSupervisionTests
   // unhealthy -> healthy -> overloaded -> healthy
   it should "goto healthy again, if overloaded and error buffer has enough successful invocations" in {
     val pool = TestProbe()
-    val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+    val invoker =
+      pool.system.actorOf(
+        InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
 
     within(timeout.duration) {
       pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
       pool.expectMsg(CurrentState(invoker, Unhealthy))
 
       (1 to InvokerActor.bufferSize).foreach { _ =>
-        invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+        invoker ! InvocationFinishedMessage(
+          InvokerInstanceId(0, userMemory = defaultUserMemory),
+          InvocationFinishedResult.Success)
       }
       pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
 
       // Fill buffer with timeouts
       (1 to InvokerActor.bufferSize).foreach { _ =>
-        invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Timeout)
+        invoker ! InvocationFinishedMessage(
+          InvokerInstanceId(0, userMemory = defaultUserMemory),
+          InvocationFinishedResult.Timeout)
       }
       pool.expectMsg(Transition(invoker, Healthy, Unresponsive))
 
       // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
       (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
-        invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+        invoker ! InvocationFinishedMessage(
+          InvokerInstanceId(0, userMemory = defaultUserMemory),
+          InvocationFinishedResult.Success)
       }
       pool.expectMsg(Transition(invoker, Unresponsive, Healthy))
     }
@@ -282,7 +302,9 @@ class InvokerSupervisionTests
   // offline -> unhealthy
   it should "go offline when unhealthy, if the state times out and go unhealthy on a successful ping again" in {
     val pool = TestProbe()
-    val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+    val invoker =
+      pool.system.actorOf(
+        InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
 
     within(timeout.duration) {
       pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -291,20 +313,23 @@ class InvokerSupervisionTests
       timeout(invoker)
       pool.expectMsg(Transition(invoker, Unhealthy, Offline))
 
-      invoker ! PingMessage(InvokerInstanceId(0))
+      invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory))
       pool.expectMsg(Transition(invoker, Offline, Unhealthy))
     }
   }
 
   it should "start timer to send testactions when unhealthy" in {
-    val invoker = TestFSMRef(new InvokerActor(InvokerInstanceId(0), ControllerInstanceId("0")))
+    val invoker =
+      TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
     invoker.stateName shouldBe Unhealthy
 
     invoker.isTimerActive(InvokerActor.timerName) shouldBe true
 
     // Fill buffer with successful invocations to become healthy again (one below errorTolerance)
     (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
-      invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+      invoker ! InvocationFinishedMessage(
+        InvokerInstanceId(0, userMemory = defaultUserMemory),
+        InvocationFinishedResult.Success)
     }
     invoker.stateName shouldBe Healthy
 
@@ -320,7 +345,7 @@ class InvokerSupervisionTests
     val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
-    val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"))
+    val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
 
     within(timeout.duration) {
 
@@ -343,9 +368,10 @@ class InvokerSupervisionTests
     val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
     val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
 
-    val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"))
+    val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
 
-    val invokerAfterRestart = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-zyx"))
+    val invokerAfterRestart =
+      InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-zyx"), userMemory = defaultUserMemory)
 
     within(timeout.duration) {
       val ping = PingMessage(invokerInstance)
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 211268f367..be0dc007fb 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -23,6 +23,7 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpec, Matchers}
 import whisk.common.{ForcibleSemaphore, TransactionId}
 import whisk.core.entity.{ByteSize, InvokerInstanceId, MemoryLimit}
+import whisk.core.entity.size._
 import whisk.core.loadBalancer.InvokerState._
 import whisk.core.loadBalancer._
 
@@ -36,22 +37,25 @@ import whisk.core.loadBalancer._
 class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with StreamLogging {
   behavior of "ShardingContainerPoolBalancerState"
 
-  def healthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Healthy)
-  def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Unhealthy)
-  def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i), Offline)
+  val defaultUserMemory: ByteSize = 1024.MB
+
+  def healthy(i: Int, memory: ByteSize = defaultUserMemory) =
+    new InvokerHealth(InvokerInstanceId(i, userMemory = memory), Healthy)
+  def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i, userMemory = defaultUserMemory), Unhealthy)
+  def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i, userMemory = defaultUserMemory), Offline)
 
   def semaphores(count: Int, max: Int): IndexedSeq[ForcibleSemaphore] =
     IndexedSeq.fill(count)(new ForcibleSemaphore(max))
 
-  def lbConfig(blackboxFraction: Double, invokerBusyThreshold: ByteSize) =
-    ShardingContainerPoolBalancerConfig(blackboxFraction, invokerBusyThreshold, 1)
+  def lbConfig(blackboxFraction: Double) =
+    ShardingContainerPoolBalancerConfig(blackboxFraction, 1)
 
   it should "update invoker's state, growing the slots data and keeping valid old data" in {
     // start empty
     val slots = 10
     val memoryPerSlot = MemoryLimit.minMemory
     val memory = memoryPerSlot * slots
-    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
     state.invokers shouldBe 'empty
     state.blackboxInvokers shouldBe 'empty
     state.managedInvokers shouldBe 'empty
@@ -60,7 +64,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     state.blackboxStepSizes shouldBe Seq.empty
 
     // apply one update, verify everything is updated accordingly
-    val update1 = IndexedSeq(healthy(0))
+    val update1 = IndexedSeq(healthy(0, memory))
     state.updateInvokers(update1)
 
     state.invokers shouldBe update1
@@ -76,7 +80,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt
 
     // apply second update, growing the state
-    val update2 = IndexedSeq(healthy(0), healthy(1))
+    val update2 =
+      IndexedSeq(healthy(0, memory), healthy(1, memory * 2))
     state.updateInvokers(update2)
 
     state.invokers shouldBe update2
@@ -84,17 +89,18 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     state.blackboxInvokers shouldBe IndexedSeq(update2.last)
     state.invokerSlots should have size update2.size
     state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt
-    state.invokerSlots(1).availablePermits shouldBe memory.toMB
+    state.invokerSlots(1).tryAcquire(memoryPerSlot.toMB.toInt)
+    state.invokerSlots(1).availablePermits shouldBe memory.toMB * 2 - memoryPerSlot.toMB
     state.managedStepSizes shouldBe Seq(1)
     state.blackboxStepSizes shouldBe Seq(1)
   }
 
   it should "allow managed partition to overlap with blackbox for small N" in {
     Seq(0.1, 0.2, 0.3, 0.4, 0.5).foreach { bf =>
-      val state = ShardingContainerPoolBalancerState()(lbConfig(bf, MemoryLimit.stdMemory))
+      val state = ShardingContainerPoolBalancerState()(lbConfig(bf))
 
       (1 to 100).toSeq.foreach { i =>
-        state.updateInvokers((1 to i).map(_ => healthy(1)))
+        state.updateInvokers((1 to i).map(_ => healthy(1, MemoryLimit.stdMemory)))
 
         withClue(s"invoker count $bf $i:") {
           state.managedInvokers.length should be <= i
@@ -120,22 +126,26 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     val slots = 10
     val memoryPerSlot = MemoryLimit.minMemory
     val memory = memoryPerSlot * slots
-    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
-    state.updateInvokers(IndexedSeq(healthy(0)))
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
+    state.updateInvokers(IndexedSeq(healthy(0, memory), healthy(1, memory * 2)))
 
     state.invokerSlots.head.tryAcquire(memoryPerSlot.toMB.toInt)
     state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB
 
+    state.invokerSlots(1).tryAcquire(memoryPerSlot.toMB.toInt)
+    state.invokerSlots(1).availablePermits shouldBe memory.toMB * 2 - memoryPerSlot.toMB
+
     state.updateCluster(2)
     state.invokerSlots.head.availablePermits shouldBe memory.toMB / 2 // state reset + divided by 2
+    state.invokerSlots(1).availablePermits shouldBe memory.toMB
   }
 
   it should "fallback to a size of 1 (alone) if cluster size is < 1" in {
     val slots = 10
     val memoryPerSlot = MemoryLimit.minMemory
     val memory = memoryPerSlot * slots
-    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
-    state.updateInvokers(IndexedSeq(healthy(0)))
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
+    state.updateInvokers(IndexedSeq(healthy(0, memory)))
 
     state.invokerSlots.head.availablePermits shouldBe memory.toMB
 
@@ -153,8 +163,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     val slots = 10
     val memoryPerSlot = MemoryLimit.minMemory
     val memory = memoryPerSlot * slots
-    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
-    state.updateInvokers(IndexedSeq(healthy(0)))
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
+    state.updateInvokers(IndexedSeq(healthy(0, memory)))
 
     state.invokerSlots.head.availablePermits shouldBe memory.toMB
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services