You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/10/09 08:21:04 UTC
[GitHub] cbickel closed pull request #4011: Customize invoker use memory for
memory based loadbalancing
cbickel closed pull request #4011: Customize invoker use memory for memory based loadbalancing
URL: https://github.com/apache/incubator-openwhisk/pull/4011
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index de2d71f704..a4cc8369b5 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -214,8 +214,6 @@
"{{ controller.ssl.storeFlavor }}"
"CONFIG_whisk_controller_https_clientAuth":
"{{ controller.ssl.clientAuth }}"
- "CONFIG_whisk_loadbalancer_invokerUserMemory":
- "{{ invoker.userMemory }}"
"CONFIG_whisk_loadbalancer_blackboxFraction":
"{{ controller.blackboxFraction }}"
"CONFIG_whisk_loadbalancer_timeoutFactor":
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 145febf2a8..6712c38e60 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -217,7 +217,7 @@
"CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}"
"CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}"
"INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
- "CONFIG_whisk_containerPool_userMemory": "{{ invoker.userMemory }}"
+ "CONFIG_whisk_containerPool_userMemory": "{{ hostvars[groups['invokers'][invoker_index | int]].user_memory | default(invoker.userMemory) }}"
"CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}"
"CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}"
"WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
index 5122980e06..0d477b44f4 100644
--- a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala
@@ -30,7 +30,8 @@ import whisk.core.entity.ControllerInstanceId.MAX_NAME_LENGTH
*/
case class InvokerInstanceId(val instance: Int,
uniqueName: Option[String] = None,
- displayedName: Option[String] = None) {
+ displayedName: Option[String] = None,
+ val userMemory: ByteSize) {
def toInt: Int = instance
override def toString: String = (Seq("invoker" + instance) ++ uniqueName ++ displayedName).mkString("/")
@@ -43,7 +44,8 @@ case class ControllerInstanceId(val asString: String) {
}
object InvokerInstanceId extends DefaultJsonProtocol {
- implicit val serdes = jsonFormat3(InvokerInstanceId.apply)
+ import whisk.core.entity.size.{serdes => xserds}
+ implicit val serdes = jsonFormat4(InvokerInstanceId.apply)
}
object ControllerInstanceId extends DefaultJsonProtocol {
diff --git a/common/scala/src/main/scala/whisk/core/entity/Size.scala b/common/scala/src/main/scala/whisk/core/entity/Size.scala
index 34f5bc6ef0..9af4128350 100644
--- a/common/scala/src/main/scala/whisk/core/entity/Size.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/Size.scala
@@ -109,7 +109,7 @@ object ByteSize {
def fromString(sizeString: String): ByteSize = {
val matcher = regex.matcher(sizeString)
if (matcher.matches()) {
- val size = matcher.group(1).toInt
+ val size = matcher.group(1).toLong
val unit = matcher.group(2).charAt(0).toUpper match {
case 'B' => SizeUnits.BYTE
case 'K' => SizeUnits.KB
diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf
index a88ea6fae3..3f76e0c995 100644
--- a/core/controller/src/main/resources/reference.conf
+++ b/core/controller/src/main/resources/reference.conf
@@ -6,7 +6,6 @@ whisk {
use-cluster-bootstrap: false
}
loadbalancer {
- invoker-user-memory: 1024 m
blackbox-fraction: 10%
# factor to increase the timeout for forced active acks
# timeout = time-limit.std * timeoutfactor + 1m
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 7e76f016ac..305386b7fb 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -194,7 +194,10 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef
// Grow the underlying status sequence to the size needed to contain the incoming ping. Dummy values are created
// to represent invokers, where ping messages haven't arrived yet
- status = padToIndexed(status, instanceId.toInt + 1, i => new InvokerHealth(InvokerInstanceId(i), Offline))
+ status = padToIndexed(
+ status,
+ instanceId.toInt + 1,
+ i => new InvokerHealth(InvokerInstanceId(i, userMemory = instanceId.userMemory), Offline))
status = status.updated(instanceId.toInt, new InvokerHealth(instanceId, Offline))
val ref = childFactory(context, instanceId)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 4d6bbf8dd2..66f66a2fe1 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -525,9 +525,6 @@ case class ShardingContainerPoolBalancerState(
lbConfig: ShardingContainerPoolBalancerConfig =
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(implicit logging: Logging) {
- private val totalInvokerThreshold = lbConfig.invokerUserMemory
- private var currentInvokerThreshold = totalInvokerThreshold
-
private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction))
logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
@@ -540,6 +537,23 @@ case class ShardingContainerPoolBalancerState(
def invokerSlots: IndexedSeq[ForcibleSemaphore] = _invokerSlots
def clusterSize: Int = _clusterSize
+ /**
+ * @param memory
+ * @return calculated invoker slot
+ */
+ private def getInvokerSlot(memory: ByteSize): ByteSize = {
+ val newTreshold = if (memory / _clusterSize < MemoryLimit.minMemory) {
+ logging.warn(
+ this,
+ s"registered controllers: ${_clusterSize}: the slots per invoker fall below the min memory of one action.")(
+ TransactionId.loadbalancer)
+ MemoryLimit.minMemory
+ } else {
+ memory / _clusterSize
+ }
+ newTreshold
+ }
+
/**
* Updates the scheduling state with the new invokers.
*
@@ -570,8 +584,8 @@ case class ShardingContainerPoolBalancerState(
if (oldSize < newSize) {
// Keeps the existing state..
- _invokerSlots = _invokerSlots ++ IndexedSeq.fill(newSize - oldSize) {
- new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt)
+ _invokerSlots = _invokerSlots ++ _invokers.drop(_invokerSlots.length).map { invoker =>
+ new ForcibleSemaphore(getInvokerSlot(invoker.id.userMemory).toMB.toInt)
}
}
}
@@ -594,22 +608,10 @@ case class ShardingContainerPoolBalancerState(
val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls back to a size of 1 (alone)
if (_clusterSize != actualSize) {
_clusterSize = actualSize
- val newTreshold = if (totalInvokerThreshold / actualSize < MemoryLimit.minMemory) {
- logging.warn(
- this,
- s"registered controllers: ${_clusterSize}: the slots per invoker fall below the min memory of one action.")(
- TransactionId.loadbalancer)
- MemoryLimit.minMemory // letting this fall below minMemory doesn't make sense
- } else {
- totalInvokerThreshold / actualSize
+ _invokerSlots = _invokers.map { invoker =>
+ new ForcibleSemaphore(getInvokerSlot(invoker.id.userMemory).toMB.toInt)
}
- currentInvokerThreshold = newTreshold
- _invokerSlots = _invokerSlots.map(_ => new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt))
-
- logging.info(
- this,
- s"loadbalancer cluster size changed to $actualSize active nodes. invokerThreshold = $currentInvokerThreshold")(
- TransactionId.loadbalancer)
+ logging.info(this, s"loadbalancer cluster size changed to $actualSize active nodes.")(TransactionId.loadbalancer)
}
}
}
@@ -625,12 +627,9 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
* Configuration for the sharding container pool balancer.
*
* @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes
- * @param invokerUserMemory how many Bytes of memory an invoker has available in total for user containers
* @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + 1m)
*/
-case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double,
- invokerUserMemory: ByteSize,
- timeoutFactor: Int)
+case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeoutFactor: Int)
/**
* State kept for each activation until completion.
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 8cec373087..95eea6eb62 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -25,11 +25,13 @@ import kamon.Kamon
import pureconfig.loadConfigOrThrow
import whisk.common.Https.HttpsConfig
import whisk.common._
-import whisk.core.WhiskConfig
+import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.core.WhiskConfig._
import whisk.core.connector.{MessagingProvider, PingMessage}
+import whisk.core.containerpool.ContainerPoolConfig
import whisk.core.entity.{ExecManifest, InvokerInstanceId}
import whisk.core.entity.ActivationEntityLimit
+import whisk.core.entity.size._
import whisk.http.{BasicHttpService, BasicRasService}
import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory
@@ -68,6 +70,7 @@ object Invoker {
implicit val actorSystem: ActorSystem =
ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
+ val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
// Prepare Kamon shutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
@@ -143,8 +146,11 @@ object Invoker {
val topicBaseName = "invoker"
val topicName = topicBaseName + assignedInvokerId
+
val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)
- val invokerInstance = InvokerInstanceId(assignedInvokerId, cmdLineArgs.uniqueName, cmdLineArgs.displayedName)
+ val invokerInstance =
+ InvokerInstanceId(assignedInvokerId, cmdLineArgs.uniqueName, cmdLineArgs.displayedName, poolConfig.userMemory)
+
val msgProvider = SpiLoader.get[MessagingProvider]
if (msgProvider
.ensureTopic(config, topic = topicName, topicConfig = topicBaseName, maxMessageBytes = maxMessageBytes)
@@ -153,7 +159,7 @@ object Invoker {
}
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
val invoker = try {
- new InvokerReactive(config, invokerInstance, producer)
+ new InvokerReactive(config, invokerInstance, producer, poolConfig)
} catch {
case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
}
diff --git a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
index ff4419d453..ed1ac37199 100644
--- a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
+++ b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
@@ -41,6 +41,7 @@ class CompletionMessageTests extends FlatSpec with Matchers {
behavior of "completion message"
+ val defaultUserMemory: ByteSize = 1024.MB
val activation = WhiskActivation(
namespace = EntityPath("ns"),
name = EntityName("a"),
@@ -53,7 +54,10 @@ class CompletionMessageTests extends FlatSpec with Matchers {
duration = Some(123))
it should "serialize a left completion message" in {
- val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InvokerInstanceId(0))
+ val m = CompletionMessage(
+ TransactionId.testing,
+ Left(ActivationId.generate()),
+ InvokerInstanceId(0, userMemory = defaultUserMemory))
m.serialize shouldBe JsObject(
"transid" -> m.transid.toJson,
"response" -> m.response.left.get.toJson,
@@ -61,7 +65,8 @@ class CompletionMessageTests extends FlatSpec with Matchers {
}
it should "serialize a right completion message" in {
- val m = CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0))
+ val m =
+ CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0, userMemory = defaultUserMemory))
m.serialize shouldBe JsObject(
"transid" -> m.transid.toJson,
"response" -> m.response.right.get.toJson,
@@ -69,12 +74,16 @@ class CompletionMessageTests extends FlatSpec with Matchers {
}
it should "deserialize a left completion message" in {
- val m = CompletionMessage(TransactionId.testing, Left(ActivationId.generate()), InvokerInstanceId(0))
+ val m = CompletionMessage(
+ TransactionId.testing,
+ Left(ActivationId.generate()),
+ InvokerInstanceId(0, userMemory = defaultUserMemory))
CompletionMessage.parse(m.serialize) shouldBe Success(m)
}
it should "deserialize a right completion message" in {
- val m = CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0))
+ val m =
+ CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0, userMemory = defaultUserMemory))
CompletionMessage.parse(m.serialize) shouldBe Success(m)
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
index 306b94605c..f32d01801b 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
@@ -38,8 +38,7 @@ import whisk.core.containerpool.docker.DockerApiWithFileAccess
import whisk.core.containerpool.docker.DockerContainerFactory
import whisk.core.containerpool.docker.DockerContainerFactoryConfig
import whisk.core.containerpool.docker.RuncApi
-import whisk.core.entity.ExecManifest
-import whisk.core.entity.InvokerInstanceId
+import whisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId}
import whisk.core.entity.size._
@RunWith(classOf[JUnitRunner])
@@ -57,6 +56,8 @@ class DockerContainerFactoryTests
behavior of "DockerContainerFactory"
+ val defaultUserMemory: ByteSize = 1024.MB
+
it should "set the docker run args based on ContainerArgsConfig" in {
val image = ExecManifest.runtimesManifest.manifests("nodejs").image
@@ -104,7 +105,7 @@ class DockerContainerFactoryTests
val factory =
new DockerContainerFactory(
- InvokerInstanceId(0),
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
Map.empty,
ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("env" -> Set("e1", "e2"))),
DockerContainerFactoryConfig(true))(actorSystem, executionContext, logging, dockerApiStub, mock[RuncApi])
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 904daebd05..920d5358b6 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -56,6 +56,7 @@ class ContainerProxyTests
val timeout = 5.seconds
val log = logging
+ val defaultUserMemory: ByteSize = 1024.MB
// Common entities to pass to the tests. We don't really care what's inside
// those for the behavior testing here, as none of the contents will really
@@ -192,7 +193,7 @@ class ContainerProxyTests
createAcker(),
store,
createCollector(),
- InvokerInstanceId(0, Some("myname")),
+ InvokerInstanceId(0, Some("myname"), userMemory = defaultUserMemory),
poolConfig,
pauseGrace = timeout))
registerCallback(machine)
@@ -216,7 +217,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -253,7 +261,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -301,7 +316,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -340,7 +362,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
@@ -373,7 +402,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
machine ! Run(noLogsAction, message)
@@ -413,7 +449,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -471,7 +514,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -507,7 +557,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -548,7 +605,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -579,7 +643,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -609,7 +680,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -643,7 +721,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, createCollector(), InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ createCollector(),
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized) // first run an activation
timeout(machine) // times out Ready state so container suspends
@@ -679,7 +764,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, createCollector(), InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ createCollector(),
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine) // times out Ready state so container suspends
@@ -716,7 +808,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
// Start running the action
@@ -768,7 +867,14 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine)
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 9cff1df96e..587e3d4431 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -49,6 +49,7 @@ import whisk.core.connector.ActivationMessage
import whisk.core.connector.PingMessage
import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity._
+import whisk.core.entity.size._
import whisk.core.loadBalancer.ActivationRequest
import whisk.core.loadBalancer.GetStatus
import whisk.core.loadBalancer.InvokerState._
@@ -74,6 +75,7 @@ class InvokerSupervisionTests
with StreamLogging {
val config = new WhiskConfig(ExecManifest.requiredProperties)
+ val defaultUserMemory: ByteSize = 1024.MB
ExecManifest.initialize(config)
@@ -92,7 +94,7 @@ class InvokerSupervisionTests
/** Helper to generate a list of (InstanceId, InvokerState) */
def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map {
- case (state, index) => new InvokerHealth(InvokerInstanceId(index), state)
+ case (state, index) => new InvokerHealth(InvokerInstanceId(index, userMemory = defaultUserMemory), state)
}
val pC = new TestConnector("pingFeedTtest", 4, false) {}
@@ -103,8 +105,8 @@ class InvokerSupervisionTests
val invoker5 = TestProbe()
val invoker2 = TestProbe()
- val invoker5Instance = InvokerInstanceId(5)
- val invoker2Instance = InvokerInstanceId(2)
+ val invoker5Instance = InvokerInstanceId(5, userMemory = defaultUserMemory)
+ val invoker2Instance = InvokerInstanceId(2, userMemory = defaultUserMemory)
val children = mutable.Queue(invoker5.ref, invoker2.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
@@ -145,7 +147,7 @@ class InvokerSupervisionTests
it should "forward the ActivationResult to the appropriate invoker" in {
val invoker = TestProbe()
- val invokerInstance = InvokerInstanceId(0)
+ val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
val invokerName = s"invoker${invokerInstance.toInt}"
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
@@ -170,7 +172,7 @@ class InvokerSupervisionTests
it should "forward an ActivationMessage to the sendActivation-Method" in {
val invoker = TestProbe()
- val invokerInstance = InvokerInstanceId(0)
+ val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
val invokerName = s"invoker${invokerInstance.toInt}"
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => invoker.ref
@@ -209,7 +211,9 @@ class InvokerSupervisionTests
// offline -> unhealthy
it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
val pool = TestProbe()
- val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+ val invoker =
+ pool.system.actorOf(
+ InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -217,7 +221,7 @@ class InvokerSupervisionTests
timeout(invoker)
pool.expectMsg(Transition(invoker, Unhealthy, Offline))
- invoker ! PingMessage(InvokerInstanceId(0))
+ invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory))
pool.expectMsg(Transition(invoker, Offline, Unhealthy))
}
}
@@ -225,26 +229,34 @@ class InvokerSupervisionTests
// unhealthy -> healthy -> unhealthy -> healthy
it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in {
val pool = TestProbe()
- val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+ val invoker =
+ pool.system.actorOf(
+ InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
pool.expectMsg(CurrentState(invoker, Unhealthy))
(1 to InvokerActor.bufferSize).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
}
pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
// Fill buffer with errors
(1 to InvokerActor.bufferSize).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.SystemError)
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.SystemError)
}
pool.expectMsg(Transition(invoker, Healthy, Unhealthy))
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
}
pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
}
@@ -253,26 +265,34 @@ class InvokerSupervisionTests
// unhealthy -> healthy -> overloaded -> healthy
it should "goto healthy again, if overloaded and error buffer has enough successful invocations" in {
val pool = TestProbe()
- val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+ val invoker =
+ pool.system.actorOf(
+ InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
pool.expectMsg(CurrentState(invoker, Unhealthy))
(1 to InvokerActor.bufferSize).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
}
pool.expectMsg(Transition(invoker, Unhealthy, Healthy))
// Fill buffer with timeouts
(1 to InvokerActor.bufferSize).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Timeout)
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Timeout)
}
pool.expectMsg(Transition(invoker, Healthy, Unresponsive))
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
}
pool.expectMsg(Transition(invoker, Unresponsive, Healthy))
}
@@ -282,7 +302,9 @@ class InvokerSupervisionTests
// offline -> unhealthy
it should "go offline when unhealthy, if the state times out and go unhealthy on a successful ping again" in {
val pool = TestProbe()
- val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0")))
+ val invoker =
+ pool.system.actorOf(
+ InvokerActor.props(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
within(timeout.duration) {
pool.send(invoker, SubscribeTransitionCallBack(pool.ref))
@@ -291,20 +313,23 @@ class InvokerSupervisionTests
timeout(invoker)
pool.expectMsg(Transition(invoker, Unhealthy, Offline))
- invoker ! PingMessage(InvokerInstanceId(0))
+ invoker ! PingMessage(InvokerInstanceId(0, userMemory = defaultUserMemory))
pool.expectMsg(Transition(invoker, Offline, Unhealthy))
}
}
it should "start timer to send testactions when unhealthy" in {
- val invoker = TestFSMRef(new InvokerActor(InvokerInstanceId(0), ControllerInstanceId("0")))
+ val invoker =
+ TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
invoker.isTimerActive(InvokerActor.timerName) shouldBe true
// Fill buffer with successful invocations to become healthy again (one below errorTolerance)
(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ =>
- invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success)
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
}
invoker.stateName shouldBe Healthy
@@ -320,7 +345,7 @@ class InvokerSupervisionTests
val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
- val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"))
+ val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
within(timeout.duration) {
@@ -343,9 +368,10 @@ class InvokerSupervisionTests
val sendActivationToInvoker = stubFunction[ActivationMessage, InvokerInstanceId, Future[RecordMetadata]]
val supervisor = system.actorOf(InvokerPool.props(childFactory, sendActivationToInvoker, pC))
- val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"))
+ val invokerInstance = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-xyz"), userMemory = defaultUserMemory)
- val invokerAfterRestart = InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-zyx"))
+ val invokerAfterRestart =
+ InvokerInstanceId(0, Some("10.x.x.x"), Some("invoker-zyx"), userMemory = defaultUserMemory)
within(timeout.duration) {
val ping = PingMessage(invokerInstance)
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 211268f367..be0dc007fb 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -23,6 +23,7 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
import whisk.common.{ForcibleSemaphore, TransactionId}
import whisk.core.entity.{ByteSize, InvokerInstanceId, MemoryLimit}
+import whisk.core.entity.size._
import whisk.core.loadBalancer.InvokerState._
import whisk.core.loadBalancer._
@@ -36,22 +37,25 @@ import whisk.core.loadBalancer._
class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with StreamLogging {
behavior of "ShardingContainerPoolBalancerState"
- def healthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Healthy)
- def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Unhealthy)
- def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i), Offline)
+ val defaultUserMemory: ByteSize = 1024.MB
+
+ def healthy(i: Int, memory: ByteSize = defaultUserMemory) =
+ new InvokerHealth(InvokerInstanceId(i, userMemory = memory), Healthy)
+ def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i, userMemory = defaultUserMemory), Unhealthy)
+ def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i, userMemory = defaultUserMemory), Offline)
def semaphores(count: Int, max: Int): IndexedSeq[ForcibleSemaphore] =
IndexedSeq.fill(count)(new ForcibleSemaphore(max))
- def lbConfig(blackboxFraction: Double, invokerBusyThreshold: ByteSize) =
- ShardingContainerPoolBalancerConfig(blackboxFraction, invokerBusyThreshold, 1)
+ def lbConfig(blackboxFraction: Double) =
+ ShardingContainerPoolBalancerConfig(blackboxFraction, 1)
it should "update invoker's state, growing the slots data and keeping valid old data" in {
// start empty
val slots = 10
val memoryPerSlot = MemoryLimit.minMemory
val memory = memoryPerSlot * slots
- val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
+ val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
state.invokers shouldBe 'empty
state.blackboxInvokers shouldBe 'empty
state.managedInvokers shouldBe 'empty
@@ -60,7 +64,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
state.blackboxStepSizes shouldBe Seq.empty
// apply one update, verify everything is updated accordingly
- val update1 = IndexedSeq(healthy(0))
+ val update1 = IndexedSeq(healthy(0, memory))
state.updateInvokers(update1)
state.invokers shouldBe update1
@@ -76,7 +80,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt
// apply second update, growing the state
- val update2 = IndexedSeq(healthy(0), healthy(1))
+ val update2 =
+ IndexedSeq(healthy(0, memory), healthy(1, memory * 2))
state.updateInvokers(update2)
state.invokers shouldBe update2
@@ -84,17 +89,18 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
state.blackboxInvokers shouldBe IndexedSeq(update2.last)
state.invokerSlots should have size update2.size
state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt
- state.invokerSlots(1).availablePermits shouldBe memory.toMB
+ state.invokerSlots(1).tryAcquire(memoryPerSlot.toMB.toInt)
+ state.invokerSlots(1).availablePermits shouldBe memory.toMB * 2 - memoryPerSlot.toMB
state.managedStepSizes shouldBe Seq(1)
state.blackboxStepSizes shouldBe Seq(1)
}
it should "allow managed partition to overlap with blackbox for small N" in {
Seq(0.1, 0.2, 0.3, 0.4, 0.5).foreach { bf =>
- val state = ShardingContainerPoolBalancerState()(lbConfig(bf, MemoryLimit.stdMemory))
+ val state = ShardingContainerPoolBalancerState()(lbConfig(bf))
(1 to 100).toSeq.foreach { i =>
- state.updateInvokers((1 to i).map(_ => healthy(1)))
+ state.updateInvokers((1 to i).map(_ => healthy(1, MemoryLimit.stdMemory)))
withClue(s"invoker count $bf $i:") {
state.managedInvokers.length should be <= i
@@ -120,22 +126,26 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
val slots = 10
val memoryPerSlot = MemoryLimit.minMemory
val memory = memoryPerSlot * slots
- val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
- state.updateInvokers(IndexedSeq(healthy(0)))
+ val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
+ state.updateInvokers(IndexedSeq(healthy(0, memory), healthy(1, memory * 2)))
state.invokerSlots.head.tryAcquire(memoryPerSlot.toMB.toInt)
state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB
+ state.invokerSlots(1).tryAcquire(memoryPerSlot.toMB.toInt)
+ state.invokerSlots(1).availablePermits shouldBe memory.toMB * 2 - memoryPerSlot.toMB
+
state.updateCluster(2)
state.invokerSlots.head.availablePermits shouldBe memory.toMB / 2 // state reset + divided by 2
+ state.invokerSlots(1).availablePermits shouldBe memory.toMB
}
it should "fallback to a size of 1 (alone) if cluster size is < 1" in {
val slots = 10
val memoryPerSlot = MemoryLimit.minMemory
val memory = memoryPerSlot * slots
- val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
- state.updateInvokers(IndexedSeq(healthy(0)))
+ val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
+ state.updateInvokers(IndexedSeq(healthy(0, memory)))
state.invokerSlots.head.availablePermits shouldBe memory.toMB
@@ -153,8 +163,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
val slots = 10
val memoryPerSlot = MemoryLimit.minMemory
val memory = memoryPerSlot * slots
- val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory))
- state.updateInvokers(IndexedSeq(healthy(0)))
+ val state = ShardingContainerPoolBalancerState()(lbConfig(0.5))
+ state.updateInvokers(IndexedSeq(healthy(0, memory)))
state.invokerSlots.head.availablePermits shouldBe memory.toMB
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services