You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/04/13 08:10:48 UTC
[incubator-openwhisk] branch master updated: Implement
ContainerFactory.cpuShare to calculate per container share. (#3211)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new d7c9fd6 Implement ContainerFactory.cpuShare to calculate per container share. (#3211)
d7c9fd6 is described below
commit d7c9fd661a8a5b339f20b3f549bfa4fa47124706
Author: tysonnorris <ty...@gmail.com>
AuthorDate: Fri Apr 13 01:10:44 2018 -0700
Implement ContainerFactory.cpuShare to calculate per container share. (#3211)
Fixes #3110.
---
ansible/roles/invoker/tasks/deploy.yml | 6 +-
.../src/main/scala/whisk/core/WhiskConfig.scala | 8 +-
.../core/containerpool/ContainerFactory.scala | 20 +++-
.../whisk/core/mesos/MesosContainerFactory.scala | 5 +-
core/invoker/src/main/resources/application.conf | 12 +-
.../whisk/core/containerpool/ContainerPool.scala | 19 ++--
.../whisk/core/containerpool/ContainerProxy.scala | 14 ++-
.../docker/DockerContainerFactory.scala | 46 +++++---
.../kubernetes/KubernetesContainerFactory.scala | 3 +-
.../main/scala/whisk/core/invoker/Invoker.scala | 6 +-
.../scala/whisk/core/invoker/InvokerReactive.scala | 19 ++--
tests/src/test/scala/common/LoggedFunction.scala | 10 ++
.../docker/test/DockerContainerFactoryTests.scala | 122 +++++++++++++++++++++
.../mesos/test/MesosContainerFactoryTest.scala | 38 +++++--
.../test/ContainerArgsConfigTest.scala | 6 +-
.../containerpool/test/ContainerPoolTests.scala | 29 +++--
.../containerpool/test/ContainerProxyTests.scala | 54 ++++++---
17 files changed, 315 insertions(+), 102 deletions(-)
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index e9302eb..a1aca12 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -192,9 +192,9 @@
{% for item in (invoker_container_network_dns_servers | default()).split(' ') %}
-e CONFIG_whisk_containerFactory_containerArgs_dnsServers_{{loop.index0}}={{ item }}
{% endfor %}
- -e INVOKER_NUMCORE='{{ invoker.numcore }}'
- -e INVOKER_CORESHARE='{{ invoker.coreshare }}'
- -e INVOKER_USE_RUNC='{{ invoker.useRunc }}'
+ -e CONFIG_whisk_containerPool_numCore='{{ invoker.numcore }}'
+ -e CONFIG_whisk_containerPool_coreShare='{{ invoker.coreshare }}'
+ -e CONFIG_whisk_docker_containerFactory_useRunc='{{ invoker.useRunc }}'
-e INVOKER_NAME='{{ groups['invokers'].index(inventory_hostname) }}'
-e WHISK_LOGS_DIR='{{ whisk_logs_dir }}'
-e METRICS_KAMON='{{ metrics.kamon.enabled }}'
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 8779323..eb22e9d 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -58,9 +58,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
val dockerImagePrefix = this(WhiskConfig.dockerImagePrefix)
val dockerImageTag = this(WhiskConfig.dockerImageTag)
- val invokerNumCore = this(WhiskConfig.invokerNumCore)
- val invokerCoreShare = this(WhiskConfig.invokerCoreShare)
- val invokerUseRunc = this.getAsBoolean(WhiskConfig.invokerUseRunc, true)
val invokerName = this(WhiskConfig.invokerName)
val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this(
@@ -170,9 +167,6 @@ object WhiskConfig {
val dockerImagePrefix = "docker.image.prefix"
val dockerImageTag = "docker.image.tag"
- val invokerNumCore = "invoker.numcore"
- val invokerCoreShare = "invoker.coreshare"
- val invokerUseRunc = "invoker.use.runc"
val invokerName = "invoker.name"
val wskApiProtocol = "whisk.api.host.proto"
@@ -233,10 +227,12 @@ object ConfigKeys {
val docker = "whisk.docker"
val dockerTimeouts = s"$docker.timeouts"
+ val dockerContainerFactory = s"${docker}.container-factory"
val runc = "whisk.runc"
val runcTimeouts = s"$runc.timeouts"
val containerFactory = "whisk.container-factory"
val containerArgs = s"$containerFactory.container-args"
+ val containerPool = "whisk.container-pool"
val blacklist = "whisk.blacklist"
val kubernetes = "whisk.kubernetes"
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
index 17860c0..35d3b8b 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala
@@ -31,6 +31,23 @@ case class ContainerArgsConfig(network: String,
dnsServers: Seq[String] = Seq.empty,
extraArgs: Map[String, Set[String]] = Map.empty)
+case class ContainerPoolConfig(numCore: Int, coreShare: Int) {
+
+ /**
+ * The total number of containers is simply the number of cores dilated by the cpu sharing.
+ */
+ def maxActiveContainers = numCore * coreShare
+
+ /**
+ * The shareFactor indicates the number of containers that would share a single core, on average.
+ * cpuShare is a docker option (-c) whereby a container's CPU access is limited.
+ * A value of 1024 is the full share so a strict resource division with a shareFactor of 2 would yield 512.
+ * On an idle/underloaded system, a container will still get to use underutilized CPU shares.
+ */
+ private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value.
+ def cpuShare = (totalShare / maxActiveContainers).toInt
+}
+
/**
* An abstraction for Container creation
*/
@@ -41,7 +58,8 @@ trait ContainerFactory {
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
- memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container]
+ memory: ByteSize,
+ cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container]
/** perform any initialization */
def init(): Unit
diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
index 9d6204b..c9e634e 100644
--- a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
+++ b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala
@@ -100,7 +100,8 @@ class MesosContainerFactory(config: WhiskConfig,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
- memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+ memory: ByteSize,
+ cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
implicit val transid = tid
val image = if (userProvidedImage) {
actionImage.publicImageName
@@ -117,7 +118,7 @@ class MesosContainerFactory(config: WhiskConfig,
image = image,
userProvidedImage = userProvidedImage,
memory = memory,
- cpuShares = config.invokerCoreShare.toInt,
+ cpuShares = cpuShares,
environment = Map("__OW_API_HOST" -> config.wskApiHost),
network = containerArgs.network,
dnsServers = containerArgs.dnsServers,
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 3ee046d..aaff48f 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -18,6 +18,16 @@ whisk {
unpause: 10 seconds
}
+ docker.container-factory {
+ # Use runc (docker-runc) for pause/resume functionality in DockerContainerFactory
+ use-runc: true
+ }
+
+ container-pool {
+ num-core: 4 # used for computing --cpushares, and max number of containers allowed
+ core-share: 2 # used for computing --cpushares, and max number of containers allowed
+ }
+
kubernetes {
# Timeouts for k8s commands. Set to "Inf" to disable timeout.
timeouts {
@@ -42,7 +52,7 @@ whisk {
container-factory.container-args {
network: bridge
dns-servers: []
- extra-args: {}
+ extra-args: {} # to pass additional args to 'docker run'; format is `{key1: [v1, v2], key2: [v1, v2]}`
}
container-proxy {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 801fc09..1835569 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -54,16 +54,14 @@ case class WorkerData(data: ContainerData, state: WorkerState)
* (kind, memory) and there is space in the pool.
*
* @param childFactory method to create new container proxy actor
- * @param maxActiveContainers maximum amount of containers doing work
- * @param maxPoolSize maximum size of containers allowed in the pool
* @param feed actor to request more work from
* @param prewarmConfig optional settings for container prewarming
+ * @param poolConfig config for the ContainerPool
*/
class ContainerPool(childFactory: ActorRefFactory => ActorRef,
- maxActiveContainers: Int,
- maxPoolSize: Int,
feed: ActorRef,
- prewarmConfig: Option[PrewarmingConfig] = None)
+ prewarmConfig: Option[PrewarmingConfig] = None,
+ poolConfig: ContainerPoolConfig)
extends Actor {
implicit val logging = new AkkaLogging(context.system.log)
@@ -98,7 +96,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
// fail for example, or a container has aged and was destroying itself when a new request was assigned)
case r: Run =>
- val createdContainer = if (busyPool.size < maxActiveContainers) {
+ val createdContainer = if (busyPool.size < poolConfig.maxActiveContainers) {
// Schedule a job to a warm container
ContainerPool
@@ -107,7 +105,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
(container, "warm")
})
.orElse {
- if (busyPool.size + freePool.size < maxPoolSize) {
+ if (busyPool.size + freePool.size < poolConfig.maxActiveContainers) {
takePrewarmContainer(r.action)
.map(container => {
(container, "prewarmed")
@@ -147,7 +145,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
logging.error(
this,
s"Rescheduling Run message, too many message in the pool, freePoolSize: ${freePool.size}, " +
- s"busyPoolSize: ${busyPool.size}, maxActiveContainers $maxActiveContainers, " +
+ s"busyPoolSize: ${busyPool.size}, maxActiveContainers ${poolConfig.maxActiveContainers}, " +
s"userNamespace: ${r.msg.user.namespace}, action: ${r.action}")(r.msg.transid)
Some(logMessageInterval.fromNow)
} else {
@@ -282,11 +280,10 @@ object ContainerPool {
}
def props(factory: ActorRefFactory => ActorRef,
- maxActive: Int,
- size: Int,
+ poolConfig: ContainerPoolConfig,
feed: ActorRef,
prewarmConfig: Option[PrewarmingConfig] = None) =
- Props(new ContainerPool(factory, maxActive, size, feed, prewarmConfig))
+ Props(new ContainerPool(factory, feed, prewarmConfig, poolConfig))
}
/** Contains settings needed to perform container prewarming */
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 99557e6..dd9885e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -94,11 +94,12 @@ case object RescheduleJob // job is sent back to parent and could not be process
* @param pauseGrace time to wait for new work before pausing the container
*/
class ContainerProxy(
- factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
+ factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
storeActivation: (TransactionId, WhiskActivation) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InstanceId,
+ poolConfig: ContainerPoolConfig,
unusedTimeout: FiniteDuration,
pauseGrace: FiniteDuration)
extends FSM[ContainerState, ContainerData]
@@ -117,7 +118,8 @@ class ContainerProxy(
ContainerProxy.containerName(instance, "prewarm", job.exec.kind),
job.exec.image,
job.exec.pull,
- job.memoryLimit)
+ job.memoryLimit,
+ poolConfig.cpuShare)
.map(container => PreWarmedData(container, job.exec.kind, job.memoryLimit))
.pipeTo(self)
@@ -133,7 +135,8 @@ class ContainerProxy(
ContainerProxy.containerName(instance, job.msg.user.namespace.name, job.action.name.name),
job.action.exec.image,
job.action.exec.pull,
- job.action.limits.memory.megabytes.MB)
+ job.action.limits.memory.megabytes.MB,
+ poolConfig.cpuShare)
// container factory will either yield a new container ready to execute the action, or
// starting up the container failed; for the latter, it's either an internal error starting
@@ -413,14 +416,15 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
object ContainerProxy {
def props(
- factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
+ factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
store: (TransactionId, WhiskActivation) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InstanceId,
+ poolConfig: ContainerPoolConfig,
unusedTimeout: FiniteDuration = timeouts.idleContainer,
pauseGrace: FiniteDuration = timeouts.pauseGrace) =
- Props(new ContainerProxy(factory, ack, store, collectLogs, instance, unusedTimeout, pauseGrace))
+ Props(new ContainerProxy(factory, ack, store, collectLogs, instance, poolConfig, unusedTimeout, pauseGrace))
// Needs to be thread-safe as it's used by multiple proxies concurrently.
private val containerCount = new Counter
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index ff0dfb5..3f2ee86 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -36,26 +36,28 @@ import pureconfig._
import whisk.core.ConfigKeys
import whisk.core.containerpool.ContainerArgsConfig
-class DockerContainerFactory(config: WhiskConfig,
- instance: InstanceId,
+case class DockerContainerFactoryConfig(useRunc: Boolean)
+
+class DockerContainerFactory(instance: InstanceId,
parameters: Map[String, Set[String]],
- containerArgs: ContainerArgsConfig =
- loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs))(
+ containerArgsConfig: ContainerArgsConfig =
+ loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs),
+ dockerContainerFactoryConfig: DockerContainerFactoryConfig =
+ loadConfigOrThrow[DockerContainerFactoryConfig](ConfigKeys.dockerContainerFactory))(
implicit actorSystem: ActorSystem,
ec: ExecutionContext,
- logging: Logging)
+ logging: Logging,
+ docker: DockerApiWithFileAccess,
+ runc: RuncApi)
extends ContainerFactory {
- /** Initialize container clients */
- implicit val docker = new DockerClientWithFileAccess()(ec)
- implicit val runc = new RuncClient()(ec)
-
/** Create a container using docker cli */
override def createContainer(tid: TransactionId,
name: String,
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
- memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+ memory: ByteSize,
+ cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
val image = if (userProvidedImage) {
actionImage.publicImageName
} else {
@@ -67,13 +69,13 @@ class DockerContainerFactory(config: WhiskConfig,
image = image,
userProvidedImage = userProvidedImage,
memory = memory,
- cpuShares = config.invokerCoreShare.toInt,
+ cpuShares = cpuShares,
environment = Map("__OW_API_HOST" -> config.wskApiHost),
- network = containerArgs.network,
- dnsServers = containerArgs.dnsServers,
+ network = containerArgsConfig.network,
+ dnsServers = containerArgsConfig.dnsServers,
name = Some(name),
- useRunc = config.invokerUseRunc,
- parameters ++ containerArgs.extraArgs)
+ useRunc = dockerContainerFactoryConfig.useRunc,
+ parameters ++ containerArgsConfig.extraArgs.map { case (k, v) => ("--" + k, v) })
}
/** Perform cleanup on init */
@@ -111,7 +113,7 @@ class DockerContainerFactory(config: WhiskConfig,
containers =>
logging.info(this, s"removing ${containers.size} action containers.")
val removals = containers.map { id =>
- (if (config.invokerUseRunc) {
+ (if (dockerContainerFactoryConfig.useRunc) {
runc.resume(id)
} else {
docker.unpause(id)
@@ -135,6 +137,14 @@ object DockerContainerFactoryProvider extends ContainerFactoryProvider {
logging: Logging,
config: WhiskConfig,
instanceId: InstanceId,
- parameters: Map[String, Set[String]]): ContainerFactory =
- new DockerContainerFactory(config, instanceId, parameters)(actorSystem, actorSystem.dispatcher, logging)
+ parameters: Map[String, Set[String]]): ContainerFactory = {
+
+ new DockerContainerFactory(instanceId, parameters)(
+ actorSystem,
+ actorSystem.dispatcher,
+ logging,
+ new DockerClientWithFileAccess()(actorSystem.dispatcher)(logging, actorSystem),
+ new RuncClient()(actorSystem.dispatcher)(logging, actorSystem))
+ }
+
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index 8b2d918..e332b84 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -64,7 +64,8 @@ class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit ac
name: String,
actionImage: ImageName,
userProvidedImage: Boolean,
- memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
+ memory: ByteSize,
+ cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
val image = if (userProvidedImage) {
actionImage.publicImageName
} else {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index e8e1dae..58a4d1d 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -56,11 +56,7 @@ object Invoker {
ExecManifest.requiredProperties ++
kafkaHosts ++
zookeeperHosts ++
- wskApiHost ++ Map(
- dockerImageTag -> "latest",
- invokerNumCore -> "4",
- invokerCoreShare -> "2",
- invokerUseRunc -> "true") ++
+ wskApiHost ++ Map(dockerImageTag -> "latest") ++
Map(invokerName -> "")
def main(args: Array[String]): Unit = {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 0729103..8601e4c 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -41,7 +41,11 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
-class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: MessageProducer)(
+class InvokerReactive(
+ config: WhiskConfig,
+ instance: InstanceId,
+ producer: MessageProducer,
+ poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool))(
implicit actorSystem: ActorSystem,
logging: Logging) {
@@ -91,7 +95,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
/** Initialize message consumers */
private val topic = s"invoker${instance.toInt}"
- private val maximumContainers = config.invokerNumCore.toInt * config.invokerCoreShare.toInt
+ private val maximumContainers = poolConfig.maxActiveContainers
private val msgProvider = SpiLoader.get[MessagingProvider]
private val consumer = msgProvider.getConsumer(
config,
@@ -140,7 +144,9 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
/** Creates a ContainerProxy Actor when being called. */
private val childFactory = (f: ActorRefFactory) =>
- f.actorOf(ContainerProxy.props(containerFactory.createContainer, ack, store, logsProvider.collectLogs, instance))
+ f.actorOf(
+ ContainerProxy
+ .props(containerFactory.createContainer, ack, store, logsProvider.collectLogs, instance, poolConfig))
private val prewarmKind = "nodejs:6"
private val prewarmExec = ExecManifest.runtimesManifest
@@ -149,12 +155,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
.get
private val pool = actorSystem.actorOf(
- ContainerPool.props(
- childFactory,
- maximumContainers,
- maximumContainers,
- activationFeed,
- Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
+ ContainerPool.props(childFactory, poolConfig, activationFeed, Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
/** Is called when an ActivationMessage is read from Kafka */
def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala
index 3789de7..e8b7261 100644
--- a/tests/src/test/scala/common/LoggedFunction.scala
+++ b/tests/src/test/scala/common/LoggedFunction.scala
@@ -72,6 +72,15 @@ class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) ex
body(v1, v2, v3, v4, v5)
}
}
+class LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body: (A1, A2, A3, A4, A5, A6) => B)
+ extends Function6[A1, A2, A3, A4, A5, A6, B] {
+ val calls = mutable.Buffer[(A1, A2, A3, A4, A5, A6)]()
+
+ override def apply(v1: A1, v2: A2, v3: A3, v4: A4, v5: A5, v6: A6): B = {
+ calls += ((v1, v2, v3, v4, v5, v6))
+ body(v1, v2, v3, v4, v5, v6)
+ }
+}
object LoggedFunction {
def apply[A1, B](body: (A1) => B) = new LoggedFunction1(body)
@@ -79,4 +88,5 @@ object LoggedFunction {
def apply[A1, A2, A3, B](body: (A1, A2, A3) => B) = new LoggedFunction3(body)
def apply[A1, A2, A3, A4, B](body: (A1, A2, A3, A4) => B) = new LoggedFunction4(body)
def apply[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) = new LoggedFunction5(body)
+ def apply[A1, A2, A3, A4, A5, A6, B](body: (A1, A2, A3, A4, A5, A6) => B) = new LoggedFunction6(body)
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
new file mode 100644
index 0000000..31139dc
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.docker.test
+
+import common.StreamLogging
+import common.TimingHelpers
+import common.WskActorSystem
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+import scala.concurrent.Await
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import whisk.common.TransactionId
+import whisk.core.WhiskConfig
+import whisk.core.WhiskConfig._
+import whisk.core.containerpool.ContainerAddress
+import whisk.core.containerpool.ContainerArgsConfig
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.docker.DockerApiWithFileAccess
+import whisk.core.containerpool.docker.DockerContainerFactory
+import whisk.core.containerpool.docker.DockerContainerFactoryConfig
+import whisk.core.containerpool.docker.RuncApi
+import whisk.core.entity.ExecManifest
+import whisk.core.entity.InstanceId
+import whisk.core.entity.size._
+
+@RunWith(classOf[JUnitRunner])
+class DockerContainerFactoryTests
+ extends FlatSpec
+ with Matchers
+ with MockFactory
+ with StreamLogging
+ with BeforeAndAfterEach
+ with WskActorSystem
+ with TimingHelpers {
+
+ implicit val config = new WhiskConfig(
+ ExecManifest.requiredProperties ++ Map(dockerImagePrefix -> "testing", dockerImageTag -> "testtag"))
+ ExecManifest.initialize(config) should be a 'success
+
+ behavior of "DockerContainerFactory"
+
+ it should "set the docker run args based on ContainerArgsConfig" in {
+
+ val image = ExecManifest.runtimesManifest.manifests("nodejs").image
+
+ implicit val tid = TransactionId.testing
+ val dockerApiStub = mock[DockerApiWithFileAccess]
+ //setup run expectation
+ (dockerApiStub
+ .run(_: String, _: Seq[String])(_: TransactionId))
+ .expects(
+ image.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag)),
+ List(
+ "--cpu-shares",
+ "32", //should be calculated as 1024/(numcore * sharefactor) via ContainerFactory.cpuShare
+ "--memory",
+ "10m",
+ "--memory-swap",
+ "10m",
+ "--network",
+ "net1",
+ "-e",
+ "__OW_API_HOST=://:",
+ "--dns",
+ "dns1",
+ "--dns",
+ "dns2",
+ "--name",
+ "testContainer",
+ "--env",
+ "e1",
+ "--env",
+ "e2"),
+ *)
+ .returning(Future.successful { ContainerId("fakecontainerid") })
+ //setup inspect expectation
+ (dockerApiStub
+ .inspectIPAddress(_: ContainerId, _: String)(_: TransactionId))
+ .expects(ContainerId("fakecontainerid"), "net1", *)
+ .returning(Future.successful { ContainerAddress("1.2.3.4", 1234) })
+ //setup rm expectation
+ (dockerApiStub
+ .rm(_: ContainerId)(_: TransactionId))
+ .expects(ContainerId("fakecontainerid"), *)
+ .returning(Future.successful(Unit))
+
+ val factory =
+ new DockerContainerFactory(
+ InstanceId(0),
+ Map(),
+ ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("env" -> Set("e1", "e2"))),
+ DockerContainerFactoryConfig(true))(actorSystem, executionContext, logging, dockerApiStub, mock[RuncApi])
+
+ val cf = factory.createContainer(tid, "testContainer", image, false, 10.MB, 32)
+
+ val c = Await.result(cf, 5000.milliseconds)
+
+ Await.result(c.destroy(), 500.milliseconds)
+
+ }
+
+}
diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index deb23ff..9a46509 100644
--- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -49,6 +49,7 @@ import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
import whisk.core.containerpool.ContainerArgsConfig
+import whisk.core.containerpool.ContainerPoolConfig
import whisk.core.containerpool.logging.DockerToActivationLogStore
import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity.size._
@@ -66,7 +67,7 @@ class MesosContainerFactoryTest
def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result[A](f, timeout)
implicit val wskConfig =
- new WhiskConfig(Map(invokerCoreShare -> "2", dockerImageTag -> "latest", wskApiHostname -> "apihost") ++ wskApiHost)
+ new WhiskConfig(Map(dockerImageTag -> "latest", wskApiHostname -> "apihost") ++ wskApiHost)
var count = 0
var lastTaskId: String = null
def testTaskId() = {
@@ -75,8 +76,9 @@ class MesosContainerFactoryTest
lastTaskId
}
- //TODO: adjust this once the invokerCoreShare issue is fixed see #3110
- def cpus() = wskConfig.invokerCoreShare.toInt / 1024.0 //
+ val poolConfig = ContainerPoolConfig(8, 10)
+ val dockerCpuShares = poolConfig.cpuShare
+ val mesosCpus = poolConfig.cpuShare / 1024.0
val containerArgsConfig =
new ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4")))
@@ -116,14 +118,20 @@ class MesosContainerFactoryTest
testTaskId)
expectMsg(Subscribe)
- factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB)
+ factory.createContainer(
+ TransactionId.testing,
+ "mesosContainer",
+ ImageName("fakeImage"),
+ false,
+ 1.MB,
+ poolConfig.cpuShare)
expectMsg(
SubmitTask(TaskDef(
lastTaskId,
"mesosContainer",
"fakeImage:" + wskConfig.dockerImageTag,
- cpus,
+ mesosCpus,
1,
List(8080),
Some(0),
@@ -159,13 +167,19 @@ class MesosContainerFactoryTest
probe.reply(new SubscribeComplete)
//create the container
- val c = factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB)
+ val c = factory.createContainer(
+ TransactionId.testing,
+ "mesosContainer",
+ ImageName("fakeImage"),
+ false,
+ 1.MB,
+ poolConfig.cpuShare)
probe.expectMsg(
SubmitTask(TaskDef(
lastTaskId,
"mesosContainer",
"fakeImage:" + wskConfig.dockerImageTag,
- cpus,
+ mesosCpus,
1,
List(8080),
Some(0),
@@ -228,14 +242,20 @@ class MesosContainerFactoryTest
probe.reply(new SubscribeComplete)
//create the container
- val c = factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB)
+ val c = factory.createContainer(
+ TransactionId.testing,
+ "mesosContainer",
+ ImageName("fakeImage"),
+ false,
+ 1.MB,
+ poolConfig.cpuShare)
probe.expectMsg(
SubmitTask(TaskDef(
lastTaskId,
"mesosContainer",
"fakeImage:" + wskConfig.dockerImageTag,
- cpus,
+ mesosCpus,
1,
List(8080),
Some(0),
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerArgsConfigTest.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerArgsConfigTest.scala
index 9990c15..82ad231 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerArgsConfigTest.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerArgsConfigTest.scala
@@ -41,8 +41,8 @@ class ContainerArgsConfigTest extends FlatSpec with Matchers {
System.setProperty("whisk.container-factory.container-args.extra-args.label.0", "l1")
System.setProperty("whisk.container-factory.container-args.extra-args.label.1", "l2")
System.setProperty("whisk.container-factory.container-args.extra-args.label.3", "l3")
- System.setProperty("whisk.container-factory.container-args.extra-args.environment.0", "e1")
- System.setProperty("whisk.container-factory.container-args.extra-args.environment.1", "e2")
+ System.setProperty("whisk.container-factory.container-args.extra-args.env.0", "e1")
+ System.setProperty("whisk.container-factory.container-args.extra-args.env.1", "e2")
System.setProperty("whisk.container-factory.container-args.dns-servers.0", "google.com")
System.setProperty("whisk.container-factory.container-args.dns-servers.1", "1.2.3.4")
@@ -52,7 +52,7 @@ class ContainerArgsConfigTest extends FlatSpec with Matchers {
config.dnsServers shouldBe Seq[String]("google.com", "1.2.3.4")
//check map parsing of extra-args config
config.extraArgs.get("label") shouldBe Some(Set("l1", "l2", "l3"))
- config.extraArgs.get("environment") shouldBe Some(Set("e1", "e2"))
+ config.extraArgs.get("env") shouldBe Some(Set("e1", "e2"))
}
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index ff6d49e..e41cea6 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -123,7 +123,7 @@ class ContainerPoolTests
it should "reuse a warm container" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
- val pool = system.actorOf(ContainerPool.props(factory, 2, 2, feed.ref))
+ val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
@@ -137,7 +137,7 @@ class ContainerPoolTests
it should "reuse a warm container when action is the same even if revision changes" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
- val pool = system.actorOf(ContainerPool.props(factory, 2, 2, feed.ref))
+ val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
@@ -152,7 +152,7 @@ class ContainerPoolTests
val (containers, factory) = testContainers(2)
val feed = TestProbe()
- val pool = system.actorOf(ContainerPool.props(factory, 2, 2, feed.ref))
+ val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
// Note that the container doesn't respond, thus it's not free to take work
@@ -166,7 +166,7 @@ class ContainerPoolTests
val feed = TestProbe()
// a pool with only 1 slot
- val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref))
+ val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
@@ -181,7 +181,7 @@ class ContainerPoolTests
val feed = TestProbe()
// a pool with only 1 active slot but 2 slots in total
- val pool = system.actorOf(ContainerPool.props(factory, 1, 2, feed.ref))
+ val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 2), feed.ref))
// Run the first container
pool ! runMessage
@@ -207,7 +207,7 @@ class ContainerPoolTests
val feed = TestProbe()
// a pool with only 1 slot
- val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref))
+ val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
@@ -222,7 +222,7 @@ class ContainerPoolTests
val feed = TestProbe()
// a pool with only 1 slot
- val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref))
+ val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, RescheduleJob) // emulate container failure ...
@@ -239,7 +239,8 @@ class ContainerPoolTests
val feed = TestProbe()
val pool =
- system.actorOf(ContainerPool.props(factory, 0, 0, feed.ref, Some(PrewarmingConfig(1, exec, memoryLimit))))
+ system.actorOf(
+ ContainerPool.props(factory, ContainerPoolConfig(0, 0), feed.ref, Some(PrewarmingConfig(1, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
}
@@ -248,7 +249,8 @@ class ContainerPoolTests
val feed = TestProbe()
val pool =
- system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref, Some(PrewarmingConfig(1, exec, memoryLimit))))
+ system.actorOf(
+ ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref, Some(PrewarmingConfig(1, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
pool ! runMessage
@@ -262,7 +264,8 @@ class ContainerPoolTests
val alternativeExec = CodeExecAsString(RuntimeManifest("anotherKind", ImageName("testImage")), "testCode", None)
val pool = system.actorOf(
- ContainerPool.props(factory, 1, 1, feed.ref, Some(PrewarmingConfig(1, alternativeExec, memoryLimit))))
+ ContainerPool
+ .props(factory, ContainerPoolConfig(1, 1), feed.ref, Some(PrewarmingConfig(1, alternativeExec, memoryLimit))))
containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind)))
pool ! runMessage
@@ -276,7 +279,9 @@ class ContainerPoolTests
val alternativeLimit = 128.MB
val pool =
- system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref, Some(PrewarmingConfig(1, exec, alternativeLimit))))
+ system.actorOf(
+ ContainerPool
+ .props(factory, ContainerPoolConfig(1, 1), feed.ref, Some(PrewarmingConfig(1, exec, alternativeLimit))))
containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit)))
pool ! runMessage
@@ -290,7 +295,7 @@ class ContainerPoolTests
val (containers, factory) = testContainers(2)
val feed = TestProbe()
- val pool = system.actorOf(ContainerPool.props(factory, 2, 2, feed.ref))
+ val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref))
// container0 is created and used
pool ! runMessage
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 5a13084..7f2e553 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -158,7 +158,7 @@ class ContainerProxyTests
/** Creates an inspectable factory */
def createFactory(response: Future[Container]) = LoggedFunction {
- (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize) =>
+ (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int) =>
response
}
@@ -176,6 +176,8 @@ class ContainerProxyTests
Future.successful(())
}
+ val poolConfig = ContainerPoolConfig(1, 2)
+
behavior of "ContainerProxy"
/*
@@ -188,12 +190,19 @@ class ContainerProxyTests
val machine =
childActorOf(
ContainerProxy
- .props(factory, createAcker, store, createCollector(), InstanceId(0, Some("myname")), pauseGrace = timeout))
+ .props(
+ factory,
+ createAcker,
+ store,
+ createCollector(),
+ InstanceId(0, Some("myname")),
+ poolConfig,
+ pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
factory.calls should have size 1
- val (tid, name, _, _, memory) = factory.calls(0)
+ val (tid, name, _, _, memory, cpuShares) = factory.calls(0)
tid shouldBe TransactionId.invokerWarmup
name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind"""
memory shouldBe memoryLimit
@@ -208,7 +217,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -243,7 +253,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -289,7 +300,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -326,7 +338,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
@@ -358,7 +371,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -392,7 +406,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -430,7 +445,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -459,7 +475,8 @@ class ContainerProxyTests
createCollector(Future.failed(LogCollectingException(ActivationLogs(partialLogs))))
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -487,7 +504,8 @@ class ContainerProxyTests
val collector = createCollector(Future.failed(new Exception))
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
@@ -519,7 +537,8 @@ class ContainerProxyTests
val store = createStore
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized) // first run an activation
timeout(machine) // times out Ready state so container suspends
@@ -553,7 +572,8 @@ class ContainerProxyTests
val store = createStore
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine) // times out Ready state so container suspends
@@ -588,7 +608,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
// Start running the action
@@ -638,7 +659,8 @@ class ContainerProxyTests
val collector = createCollector()
val machine =
- childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout))
+ childActorOf(
+ ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
timeout(machine)
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.